Online config change

Summary: This diff is intended to forward port HBASE-8805 and HBASE-8544 implemented by Gaurav Menghani in 89-fb. This improves operational efficiency in managing clusters that are serving production traffic. The idea is to have a central configuration which can manage notifying the configuration observers. The observers in turn should update their local state from the latest config. Minor caveats where configuration variables are corelated should be taken care of with additional care.

Test Plan: Unit tests

Differential Revision: https://reviews.facebook.net/D24681

Signed-off-by: stack <stack@apache.org>
This commit is contained in:
manukranthk 2014-10-01 15:13:58 -07:00 committed by stack
parent a47fe89d95
commit 962065de72
14 changed files with 811 additions and 41 deletions

View File

@ -0,0 +1,139 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.conf;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import java.util.Collections;
import java.util.Set;
import java.util.WeakHashMap;
/**
* Maintains the set of all the classes which would like to get notified
* when the Configuration is reloaded from the disk in the Online Configuration
* Change mechanism, which lets you update certain configuration properties
* on-the-fly, without having to restart the cluster.
*
* If a class has configuration properties which you would like to be able to
* change on-the-fly, do the following:
* 1. Implement the {@link ConfigurationObserver} interface. This would require
* you to implement the
* {@link ConfigurationObserver#onConfigurationChange(Configuration)}
* method. This is a callback that is used to notify your class' instance
* that the configuration has changed. In this method, you need to check
* if the new values for the properties that are of interest to your class
* are different from the cached values. If yes, update them.
*
* However, be careful with this. Certain properties might be trivially
* mutable online, but others might not. Two properties might be trivially
* mutable by themselves, but not when changed together. For example, if a
* method uses properties "a" and "b" to make some decision, and is running
* in parallel when the notifyOnChange() method updates "a", but hasn't
* yet updated "b", it might make a decision on the basis of a new value of
* "a", and an old value of "b". This might introduce subtle bugs. This
* needs to be dealt on a case-by-case basis, and this class does not provide
* any protection from such cases.
*
* 2. Register the appropriate instance of the class with the
* {@link ConfigurationManager} instance, using the
* {@link ConfigurationManager#registerObserver(ConfigurationObserver)}
* method. For the RS side of things, the ConfigurationManager is a static
* member of the {@link org.apache.hadoop.hbase.regionserver.HRegionServer}
* class. Be careful not to do this in the constructor, as you might cause
* the 'this' reference to escape. Use a factory method, or an initialize()
* method which is called after the construction of the object.
*
* 3. Deregister the instance using the
* {@link ConfigurationManager#deregisterObserver(ConfigurationObserver)}
* method when it is going out of scope. In case you are not able to do that
* for any reason, it is still okay, since entries for dead observers are
* automatically collected during GC. But nonetheless, it is still a good
* practice to deregister your observer, whenever possible.
*
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class ConfigurationManager {
public static final Log LOG = LogFactory.getLog(ConfigurationManager.class);
// The set of Configuration Observers. These classes would like to get
// notified when the configuration is reloaded from disk. This is a set
// constructed from a WeakHashMap, whose entries would be removed if the
// observer classes go out of scope.
private Set<ConfigurationObserver> configurationObservers =
Collections.newSetFromMap(new WeakHashMap<ConfigurationObserver,
Boolean>());
/**
* Register an observer class
* @param observer
*/
public void registerObserver(ConfigurationObserver observer) {
synchronized (configurationObservers) {
configurationObservers.add(observer);
if (observer instanceof PropagatingConfigurationObserver) {
((PropagatingConfigurationObserver) observer).registerChildren(this);
}
}
}
/**
* Deregister an observer class
* @param observer
*/
public void deregisterObserver(ConfigurationObserver observer) {
synchronized (configurationObservers) {
configurationObservers.remove(observer);
if (observer instanceof PropagatingConfigurationObserver) {
((PropagatingConfigurationObserver) observer).deregisterChildren(this);
}
}
}
/**
* The conf object has been repopulated from disk, and we have to notify
* all the observers that are expressed interest to do that.
*/
public void notifyAllObservers(Configuration conf) {
synchronized (configurationObservers) {
for (ConfigurationObserver observer : configurationObservers) {
try {
if (observer != null) {
observer.onConfigurationChange(conf);
}
} catch (Throwable t) {
LOG.error("Encountered a throwable while notifying observers: " + " of type : " +
observer.getClass().getCanonicalName() + "(" + observer + ")", t);
}
}
}
}
/**
* @return the number of observers.
*/
public int getNumObservers() {
synchronized (configurationObservers) {
return configurationObservers.size();
}
}
}

View File

@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.conf;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
/**
* Every class that wants to observe changes in Configuration properties,
* must implement interface (and also, register itself with the
* <code>ConfigurationManager</code> object.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public interface ConfigurationObserver {
/**
* This method would be called by the {@link ConfigurationManager}
* object when the {@link Configuration} object is reloaded from disk.
*/
void onConfigurationChange(Configuration conf);
}

View File

@ -0,0 +1,43 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.conf;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* This extension to ConfigurationObserver allows the configuration to propagate to the children of
* the current {@link ConfigurationObserver}. This is the preferred way to make a class online
* configurable because it allows the user to configure the children in a recursive manner
* automatically.
*/
@InterfaceAudience.Private
public interface PropagatingConfigurationObserver extends ConfigurationObserver {
/**
* Needs to be called to register the children to the manager.
* @param manager : to register to
*/
void registerChildren(ConfigurationManager manager);
/**
* Needs to be called to deregister the children from the manager.
* @param manager : to deregister from
*/
void deregisterChildren(ConfigurationManager manager);
}

View File

@ -36,6 +36,8 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
@ -50,9 +52,27 @@ import com.google.common.base.Preconditions;
* Compact region on request and then run split if appropriate
*/
@InterfaceAudience.Private
public class CompactSplitThread implements CompactionRequestor {
public class CompactSplitThread implements CompactionRequestor, PropagatingConfigurationObserver {
static final Log LOG = LogFactory.getLog(CompactSplitThread.class);
// Configuration key for the large compaction threads.
public final static String LARGE_COMPACTION_THREADS =
"hbase.regionserver.thread.compaction.large";
public final static int LARGE_COMPACTION_THREADS_DEFAULT = 1;
// Configuration key for the small compaction threads.
public final static String SMALL_COMPACTION_THREADS =
"hbase.regionserver.thread.compaction.small";
public final static int SMALL_COMPACTION_THREADS_DEFAULT = 1;
// Configuration key for split threads
public final static String SPLIT_THREADS = "hbase.regionserver.thread.split";
public final static int SPLIT_THREADS_DEFAULT = 1;
// Configuration keys for merge threads
public final static String MERGE_THREADS = "hbase.regionserver.thread.merge";
public final static int MERGE_THREADS_DEFAULT = 1;
private final HRegionServer server;
private final Configuration conf;
@ -77,11 +97,11 @@ public class CompactSplitThread implements CompactionRequestor {
Integer.MAX_VALUE);
int largeThreads = Math.max(1, conf.getInt(
"hbase.regionserver.thread.compaction.large", 1));
LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT));
int smallThreads = conf.getInt(
"hbase.regionserver.thread.compaction.small", 1);
SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT);
int splitThreads = conf.getInt("hbase.regionserver.thread.split", 1);
int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT);
// if we have throttle threads, make sure the user also specified size
Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0);
@ -121,7 +141,7 @@ public class CompactSplitThread implements CompactionRequestor {
return t;
}
});
int mergeThreads = conf.getInt("hbase.regionserver.thread.merge", 1);
int mergeThreads = conf.getInt(MERGE_THREADS, MERGE_THREADS_DEFAULT);
this.mergePool = (ThreadPoolExecutor) Executors.newFixedThreadPool(
mergeThreads, new ThreadFactory() {
@Override
@ -147,7 +167,7 @@ public class CompactSplitThread implements CompactionRequestor {
queueLists.append("Compaction/Split Queue dump:\n");
queueLists.append(" LargeCompation Queue:\n");
BlockingQueue<Runnable> lq = longCompactions.getQueue();
Iterator it = lq.iterator();
Iterator<Runnable> it = lq.iterator();
while(it.hasNext()){
queueLists.append(" "+it.next().toString());
queueLists.append("\n");
@ -540,4 +560,84 @@ public class CompactSplitThread implements CompactionRequestor {
}
}
}
/**
* {@inheritDoc}
*/
@Override
public void onConfigurationChange(Configuration newConf) {
// Check if number of large / small compaction threads has changed, and then
// adjust the core pool size of the thread pools, by using the
// setCorePoolSize() method. According to the javadocs, it is safe to
// change the core pool size on-the-fly. We need to reset the maximum
// pool size, as well.
int largeThreads = Math.max(1, newConf.getInt(
LARGE_COMPACTION_THREADS,
LARGE_COMPACTION_THREADS_DEFAULT));
if (this.longCompactions.getCorePoolSize() != largeThreads) {
LOG.info("Changing the value of " + LARGE_COMPACTION_THREADS +
" from " + this.longCompactions.getCorePoolSize() + " to " +
largeThreads);
this.longCompactions.setMaximumPoolSize(largeThreads);
this.longCompactions.setCorePoolSize(largeThreads);
}
int smallThreads = newConf.getInt(SMALL_COMPACTION_THREADS,
SMALL_COMPACTION_THREADS_DEFAULT);
if (this.shortCompactions.getCorePoolSize() != smallThreads) {
LOG.info("Changing the value of " + SMALL_COMPACTION_THREADS +
" from " + this.shortCompactions.getCorePoolSize() + " to " +
smallThreads);
this.shortCompactions.setMaximumPoolSize(smallThreads);
this.shortCompactions.setCorePoolSize(smallThreads);
}
int splitThreads = newConf.getInt(SPLIT_THREADS,
SPLIT_THREADS_DEFAULT);
if (this.splits.getCorePoolSize() != splitThreads) {
LOG.info("Changing the value of " + SPLIT_THREADS +
" from " + this.splits.getCorePoolSize() + " to " +
splitThreads);
this.splits.setMaximumPoolSize(smallThreads);
this.splits.setCorePoolSize(smallThreads);
}
int mergeThreads = newConf.getInt(MERGE_THREADS,
MERGE_THREADS_DEFAULT);
if (this.mergePool.getCorePoolSize() != mergeThreads) {
LOG.info("Changing the value of " + MERGE_THREADS +
" from " + this.mergePool.getCorePoolSize() + " to " +
mergeThreads);
this.mergePool.setMaximumPoolSize(smallThreads);
this.mergePool.setCorePoolSize(smallThreads);
}
// We change this atomically here instead of reloading the config in order that upstream
// would be the only one with the flexibility to reload the config.
this.conf.reloadConfiguration();
}
protected int getSmallCompactionThreadNum() {
return this.shortCompactions.getCorePoolSize();
}
public int getLargeCompactionThreadNum() {
return this.longCompactions.getCorePoolSize();
}
/**
* {@inheritDoc}
*/
@Override
public void registerChildren(ConfigurationManager manager) {
// No children to register.
}
/**
* {@inheritDoc}
*/
@Override
public void deregisterChildren(ConfigurationManager manager) {
// No children to register
}
}

View File

@ -97,6 +97,8 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
@ -150,6 +152,7 @@ import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.util.StringUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@ -197,7 +200,7 @@ import com.google.protobuf.Service;
* defines the keyspace for this HRegion.
*/
@InterfaceAudience.Private
public class HRegion implements HeapSize { // , Writable{
public class HRegion implements HeapSize, PropagatingConfigurationObserver { // , Writable{
public static final Log LOG = LogFactory.getLog(HRegion.class);
public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY =
@ -348,6 +351,8 @@ public class HRegion implements HeapSize { // , Writable{
// when a region is in recovering state, it can only accept writes not reads
private volatile boolean isRecovering = false;
private volatile Optional<ConfigurationManager> configurationManager;
/**
* @return The smallest mvcc readPoint across all the scanners in this
* region. Writes older than this readPoint, are included in every
@ -664,6 +669,7 @@ public class HRegion implements HeapSize { // , Writable{
this.disallowWritesInRecovering =
conf.getBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING,
HConstants.DEFAULT_DISALLOW_WRITES_IN_RECOVERING_CONFIG);
configurationManager = Optional.absent();
}
void setHTableSpecificConf() {
@ -5747,7 +5753,7 @@ public class HRegion implements HeapSize { // , Writable{
public static final long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT +
ClassSize.ARRAY +
40 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
41 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
(12 * Bytes.SIZEOF_LONG) +
4 * Bytes.SIZEOF_BOOLEAN);
@ -6488,4 +6494,33 @@ public class HRegion implements HeapSize { // , Writable{
this.log.sync();
}
}
/**
* {@inheritDoc}
*/
@Override
public void onConfigurationChange(Configuration conf) {
// Do nothing for now.
}
/**
* {@inheritDoc}
*/
@Override
public void registerChildren(ConfigurationManager manager) {
configurationManager = Optional.of(manager);
for (Store s : this.stores.values()) {
configurationManager.get().registerObserver(s);
}
}
/**
* {@inheritDoc}
*/
@Override
public void deregisterChildren(ConfigurationManager manager) {
for (Store s : this.stores.values()) {
configurationManager.get().deregisterObserver(s);
}
}
}

View File

@ -76,6 +76,8 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
@ -436,6 +438,12 @@ public class HRegionServer extends HasThread implements
protected BaseCoordinatedStateManager csm;
/**
* Configuration manager is used to register/deregister and notify the configuration observers
* when the regionserver is notified that there was a change in the on disk configs.
*/
private final ConfigurationManager configurationManager;
/**
* Starts a HRegionServer at the default location.
* @param conf
@ -534,6 +542,7 @@ public class HRegionServer extends HasThread implements
clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this);
clusterStatusTracker.start();
}
this.configurationManager = new ConfigurationManager();
rpcServices.start();
putUpWebUI();
@ -769,6 +778,12 @@ public class HRegionServer extends HasThread implements
if (storefileRefreshPeriod > 0) {
this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod, this, this);
}
registerConfigurationObservers();
}
private void registerConfigurationObservers() {
// Registering the compactSplitThread object with the ConfigurationManager.
configurationManager.registerObserver(this.compactSplitThread);
}
/**
@ -2284,6 +2299,7 @@ public class HRegionServer extends HasThread implements
@Override
public void addToOnlineRegions(HRegion region) {
this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
configurationManager.registerObserver(region);
}
/**
@ -3090,4 +3106,12 @@ public class HRegionServer extends HasThread implements
public CacheConfig getCacheConfig() {
return this.cacheConfig;
}
/**
* @return : Returns the ConfigurationManager object for testing purposes.
*/
protected ConfigurationManager getConfigurationManager() {
return configurationManager;
}
}

View File

@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.crypto.Cipher;
import org.apache.hadoop.hbase.io.crypto.Encryption;
@ -90,6 +91,7 @@ import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.StringUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableCollection;
import com.google.common.collect.ImmutableList;
@ -135,7 +137,7 @@ public class HStore implements Store {
private final HRegion region;
private final HColumnDescriptor family;
private final HRegionFileSystem fs;
private final Configuration conf;
private Configuration conf;
private final CacheConfig cacheConf;
private long lastCompactSize = 0;
volatile boolean forceMajor = false;
@ -178,7 +180,7 @@ public class HStore implements Store {
final StoreEngine<?, ?, ?, ?> storeEngine;
private static final AtomicBoolean offPeakCompactionTracker = new AtomicBoolean();
private final OffPeakHours offPeakHours;
private volatile OffPeakHours offPeakHours;
private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10;
private int flushRetriesNumber;
@ -2202,4 +2204,44 @@ public class HStore implements Store {
public long getMajorCompactedCellsSize() {
return majorCompactedCellsSize;
}
/**
* Returns the StoreEngine that is backing this concrete implementation of Store.
* @return Returns the {@link StoreEngine} object used internally inside this HStore object.
*/
protected StoreEngine<?, ?, ?, ?> getStoreEngine() {
return this.storeEngine;
}
protected OffPeakHours getOffPeakHours() {
return this.offPeakHours;
}
/**
* {@inheritDoc}
*/
@Override
public void onConfigurationChange(Configuration conf) {
this.conf = new CompoundConfiguration()
.add(conf)
.addBytesMap(family.getValues());
this.storeEngine.compactionPolicy.setConf(conf);
this.offPeakHours = OffPeakHours.getInstance(conf);
}
/**
* {@inheritDoc}
*/
@Override
public void registerChildren(ConfigurationManager manager) {
// No children to register
}
/**
* {@inheritDoc}
*/
@Override
public void deregisterChildren(ConfigurationManager manager) {
// No children to deregister
}
}

View File

@ -32,6 +32,8 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@ -48,7 +50,7 @@ import org.apache.hadoop.hbase.util.Pair;
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public interface Store extends HeapSize, StoreConfigInformation {
public interface Store extends HeapSize, StoreConfigInformation, PropagatingConfigurationObserver {
/* The default priority for user-specified compaction requests.
* The user gets top priority unless we have blocking compactions. (Pri <= 0)

View File

@ -40,7 +40,6 @@ import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
* Set parameter as "hbase.hstore.compaction.<attribute>"
*/
//TODO: revisit this class for online parameter updating (both in xml and on the CF)
@InterfaceAudience.Private
public class CompactionConfiguration {
@ -55,19 +54,22 @@ public class CompactionConfiguration {
public static final String HBASE_HSTORE_COMPACTION_MAX_KEY = "hbase.hstore.compaction.max";
public static final String HBASE_HSTORE_COMPACTION_MAX_SIZE_KEY =
"hbase.hstore.compaction.max.size";
public static final String HBASE_HSTORE_OFFPEAK_END_HOUR = "hbase.offpeak.end.hour";
public static final String HBASE_HSTORE_OFFPEAK_START_HOUR = "hbase.offpeak.start.hour";
Configuration conf;
StoreConfigInformation storeConfigInfo;
long maxCompactSize;
long minCompactSize;
int minFilesToCompact;
int maxFilesToCompact;
double compactionRatio;
double offPeekCompactionRatio;
long throttlePoint;
long majorCompactionPeriod;
float majorCompactionJitter;
private final double offPeakCompactionRatio;
/** Since all these properties can change online, they are volatile **/
private final long maxCompactSize;
private final long minCompactSize;
private final int minFilesToCompact;
private final int maxFilesToCompact;
private final double compactionRatio;
private final long throttlePoint;
private final long majorCompactionPeriod;
private final float majorCompactionJitter;
CompactionConfiguration(Configuration conf, StoreConfigInformation storeConfigInfo) {
this.conf = conf;
@ -80,9 +82,9 @@ public class CompactionConfiguration {
/*old name*/ conf.getInt("hbase.hstore.compactionThreshold", 3)));
maxFilesToCompact = conf.getInt(HBASE_HSTORE_COMPACTION_MAX_KEY, 10);
compactionRatio = conf.getFloat(HBASE_HSTORE_COMPACTION_RATIO_KEY, 1.2F);
offPeekCompactionRatio = conf.getFloat(HBASE_HSTORE_COMPACTION_RATIO_OFFPEAK_KEY, 5.0F);
offPeakCompactionRatio = conf.getFloat(HBASE_HSTORE_COMPACTION_RATIO_OFFPEAK_KEY, 5.0F);
throttlePoint = conf.getLong("hbase.regionserver.thread.compaction.throttle",
throttlePoint = conf.getLong("hbase.regionserver.thread.compaction.throttle",
2 * maxFilesToCompact * storeConfigInfo.getMemstoreFlushSize());
majorCompactionPeriod = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24*7);
// Make it 0.5 so jitter has us fall evenly either side of when the compaction should run
@ -101,7 +103,7 @@ public class CompactionConfiguration {
minFilesToCompact,
maxFilesToCompact,
compactionRatio,
offPeekCompactionRatio,
offPeakCompactionRatio,
throttlePoint,
majorCompactionPeriod,
majorCompactionJitter);
@ -110,49 +112,49 @@ public class CompactionConfiguration {
/**
* @return lower bound below which compaction is selected without ratio test
*/
long getMinCompactSize() {
public long getMinCompactSize() {
return minCompactSize;
}
/**
* @return upper bound on file size to be included in minor compactions
*/
long getMaxCompactSize() {
public long getMaxCompactSize() {
return maxCompactSize;
}
/**
* @return upper bound on number of files to be included in minor compactions
*/
int getMinFilesToCompact() {
public int getMinFilesToCompact() {
return minFilesToCompact;
}
/**
* @return upper bound on number of files to be included in minor compactions
*/
int getMaxFilesToCompact() {
public int getMaxFilesToCompact() {
return maxFilesToCompact;
}
/**
* @return Ratio used for compaction
*/
double getCompactionRatio() {
public double getCompactionRatio() {
return compactionRatio;
}
/**
* @return Off peak Ratio used for compaction
*/
double getCompactionRatioOffPeak() {
return offPeekCompactionRatio;
public double getCompactionRatioOffPeak() {
return offPeakCompactionRatio;
}
/**
* @return ThrottlePoint used for classifying small and large compactions
*/
long getThrottlePoint() {
public long getThrottlePoint() {
return throttlePoint;
}
@ -160,7 +162,7 @@ public class CompactionConfiguration {
* @return Major compaction period from compaction.
* Major compactions are selected periodically according to this parameter plus jitter
*/
long getMajorCompactionPeriod() {
public long getMajorCompactionPeriod() {
return majorCompactionPeriod;
}
@ -168,7 +170,7 @@ public class CompactionConfiguration {
* @return Major the jitter fraction, the fraction within which the major compaction
* period is randomly chosen from the majorCompactionPeriod in each store.
*/
float getMajorCompactionJitter() {
public float getMajorCompactionJitter() {
return majorCompactionJitter;
}
}

View File

@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.regionserver.compactions;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@ -62,4 +61,11 @@ public abstract class CompactionPolicy {
public void setConf(Configuration conf) {
this.comConf = new CompactionConfiguration(conf, this.storeConfigInfo);
}
/**
* @return The current compaction configuration settings.
*/
public CompactionConfiguration getConf() {
return this.comConf;
}
}

View File

@ -32,8 +32,8 @@ public abstract class OffPeakHours {
};
public static OffPeakHours getInstance(Configuration conf) {
int startHour = conf.getInt("hbase.offpeak.start.hour", -1);
int endHour = conf.getInt("hbase.offpeak.end.hour", -1);
int startHour = conf.getInt(CompactionConfiguration.HBASE_HSTORE_OFFPEAK_START_HOUR, -1);
int endHour = conf.getInt(CompactionConfiguration.HBASE_HSTORE_OFFPEAK_END_HOUR, -1);
return getInstance(startHour, endHour);
}

View File

@ -0,0 +1,133 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.conf;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.experimental.categories.Category;
@Category({SmallTests.class, ClientTests.class})
public class TestConfigurationManager extends TestCase {
public static final Log LOG = LogFactory.getLog(TestConfigurationManager.class);
class DummyConfigurationObserver implements ConfigurationObserver {
private boolean notifiedOnChange = false;
private ConfigurationManager cm;
public DummyConfigurationObserver(ConfigurationManager cm) {
this.cm = cm;
register();
}
public void onConfigurationChange(Configuration conf) {
notifiedOnChange = true;
}
// Was the observer notified on Configuration change?
public boolean wasNotifiedOnChange() {
return notifiedOnChange;
}
public void resetNotifiedOnChange() {
notifiedOnChange = false;
}
public void register() {
this.cm.registerObserver(this);
}
public void deregister() {
this.cm.deregisterObserver(this);
}
}
/**
* Test if observers get notified by the <code>ConfigurationManager</code>
* when the Configuration is reloaded.
*/
public void testCheckIfObserversNotified() {
Configuration conf = new Configuration();
ConfigurationManager cm = new ConfigurationManager();
DummyConfigurationObserver d1 = new DummyConfigurationObserver(cm);
// Check if we get notified.
cm.notifyAllObservers(conf);
assertTrue(d1.wasNotifiedOnChange());
d1.resetNotifiedOnChange();
// Now check if we get notified on change with more than one observers.
DummyConfigurationObserver d2 = new DummyConfigurationObserver(cm);
cm.notifyAllObservers(conf);
assertTrue(d1.wasNotifiedOnChange());
d1.resetNotifiedOnChange();
assertTrue(d2.wasNotifiedOnChange());
d2.resetNotifiedOnChange();
// Now try deregistering an observer and verify that it was not notified
d2.deregister();
cm.notifyAllObservers(conf);
assertTrue(d1.wasNotifiedOnChange());
d1.resetNotifiedOnChange();
assertFalse(d2.wasNotifiedOnChange());
}
// Register an observer that will go out of scope immediately, allowing
// us to test that out of scope observers are deregistered.
private void registerLocalObserver(ConfigurationManager cm) {
new DummyConfigurationObserver(cm);
}
/**
* Test if out-of-scope observers are deregistered on GC.
*/
public void testDeregisterOnOutOfScope() {
Configuration conf = new Configuration();
ConfigurationManager cm = new ConfigurationManager();
boolean outOfScopeObserversDeregistered = false;
// On my machine, I was able to cause a GC after around 5 iterations.
// If we do not cause a GC in 100k iterations, which is very unlikely,
// there might be something wrong with the GC.
for (int i = 0; i < 100000; i++) {
registerLocalObserver(cm);
cm.notifyAllObservers(conf);
// 'Suggest' the system to do a GC. We should be able to cause GC
// atleast once in the 2000 iterations.
System.gc();
// If GC indeed happened, all the observers (which are all out of scope),
// should have been deregistered.
if (cm.getNumObservers() <= i) {
outOfScopeObserversDeregistered = true;
break;
}
}
if (!outOfScopeObserversDeregistered) {
LOG.warn("Observers were not GC-ed! Something seems to be wrong.");
}
assertTrue(outOfScopeObserversDeregistered);
}
}

View File

@ -0,0 +1,206 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.experimental.categories.Category;
import java.io.IOException;
/**
* Verify that the Online config Changes on the HRegionServer side are actually
* happening. We should add tests for important configurations which will be
* changed online.
*/
@Category({MediumTests.class})
public class TestRegionServerOnlineConfigChange extends TestCase {
static final Log LOG =
LogFactory.getLog(TestRegionServerOnlineConfigChange.class.getName());
HBaseTestingUtility hbaseTestingUtility = new HBaseTestingUtility();
Configuration conf = null;
HTable t1 = null;
HRegionServer rs1 = null;
byte[] r1name = null;
HRegion r1 = null;
final String table1Str = "table1";
final String columnFamily1Str = "columnFamily1";
final byte[] TABLE1 = Bytes.toBytes(table1Str);
final byte[] COLUMN_FAMILY1 = Bytes.toBytes(columnFamily1Str);
@Override
public void setUp() throws Exception {
conf = hbaseTestingUtility.getConfiguration();
hbaseTestingUtility.startMiniCluster(1,1);
t1 = hbaseTestingUtility.createTable(TABLE1, COLUMN_FAMILY1);
@SuppressWarnings("deprecation")
HRegionInfo firstHRI = t1.getRegionLocations().keySet().iterator().next();
r1name = firstHRI.getRegionName();
rs1 = hbaseTestingUtility.getHBaseCluster().getRegionServer(
hbaseTestingUtility.getHBaseCluster().getServerWith(r1name));
r1 = rs1.getRegion(r1name);
}
@Override
public void tearDown() throws Exception {
hbaseTestingUtility.shutdownMiniCluster();
}
/**
* Check if the number of compaction threads changes online
* @throws IOException
*/
public void testNumCompactionThreadsOnlineChange() throws IOException {
assertTrue(rs1.compactSplitThread != null);
int newNumSmallThreads =
rs1.compactSplitThread.getSmallCompactionThreadNum() + 1;
int newNumLargeThreads =
rs1.compactSplitThread.getLargeCompactionThreadNum() + 1;
conf.setInt("hbase.regionserver.thread.compaction.small",
newNumSmallThreads);
conf.setInt("hbase.regionserver.thread.compaction.large",
newNumLargeThreads);
rs1.getConfigurationManager().notifyAllObservers(conf);
assertEquals(newNumSmallThreads,
rs1.compactSplitThread.getSmallCompactionThreadNum());
assertEquals(newNumLargeThreads,
rs1.compactSplitThread.getLargeCompactionThreadNum());
}
/**
* Test that the configurations in the CompactionConfiguration class change
* properly.
*
* @throws IOException
*/
public void testCompactionConfigurationOnlineChange() throws IOException {
String strPrefix = "hbase.hstore.compaction.";
Store s = r1.getStore(COLUMN_FAMILY1);
if (!(s instanceof HStore)) {
LOG.error("Can't test the compaction configuration of HStore class. "
+ "Got a different implementation other than HStore");
return;
}
HStore hstore = (HStore)s;
// Set the new compaction ratio to a different value.
double newCompactionRatio =
hstore.getStoreEngine().getCompactionPolicy().getConf().getCompactionRatio() + 0.1;
conf.setFloat(strPrefix + "ratio", (float)newCompactionRatio);
// Notify all the observers, which includes the Store object.
rs1.getConfigurationManager().notifyAllObservers(conf);
// Check if the compaction ratio got updated in the Compaction Configuration
assertEquals(newCompactionRatio,
hstore.getStoreEngine().getCompactionPolicy().getConf().getCompactionRatio(),
0.00001);
// Check if the off peak compaction ratio gets updated.
double newOffPeakCompactionRatio =
hstore.getStoreEngine().getCompactionPolicy().getConf().getCompactionRatioOffPeak() + 0.1;
conf.setFloat(strPrefix + "ratio.offpeak",
(float)newOffPeakCompactionRatio);
rs1.getConfigurationManager().notifyAllObservers(conf);
assertEquals(newOffPeakCompactionRatio,
hstore.getStoreEngine().getCompactionPolicy().getConf().getCompactionRatioOffPeak(),
0.00001);
// Check if the throttle point gets updated.
long newThrottlePoint =
hstore.getStoreEngine().getCompactionPolicy().getConf().getThrottlePoint() + 10;
conf.setLong("hbase.regionserver.thread.compaction.throttle",
newThrottlePoint);
rs1.getConfigurationManager().notifyAllObservers(conf);
assertEquals(newThrottlePoint,
hstore.getStoreEngine().getCompactionPolicy().getConf().getThrottlePoint());
// Check if the minFilesToCompact gets updated.
int newMinFilesToCompact =
hstore.getStoreEngine().getCompactionPolicy().getConf().getMinFilesToCompact() + 1;
conf.setLong(strPrefix + "min", newMinFilesToCompact);
rs1.getConfigurationManager().notifyAllObservers(conf);
assertEquals(newMinFilesToCompact,
hstore.getStoreEngine().getCompactionPolicy().getConf().getMinFilesToCompact());
// Check if the maxFilesToCompact gets updated.
int newMaxFilesToCompact =
hstore.getStoreEngine().getCompactionPolicy().getConf().getMaxFilesToCompact() + 1;
conf.setLong(strPrefix + "max", newMaxFilesToCompact);
rs1.getConfigurationManager().notifyAllObservers(conf);
assertEquals(newMaxFilesToCompact,
hstore.getStoreEngine().getCompactionPolicy().getConf().getMaxFilesToCompact());
// Check OffPeak hours is updated in an online fashion.
conf.setLong(CompactionConfiguration.HBASE_HSTORE_OFFPEAK_START_HOUR, 6);
conf.setLong(CompactionConfiguration.HBASE_HSTORE_OFFPEAK_END_HOUR, 7);
rs1.getConfigurationManager().notifyAllObservers(conf);
assertFalse(hstore.getOffPeakHours().isOffPeakHour(4));
// Check if the minCompactSize gets updated.
long newMinCompactSize =
hstore.getStoreEngine().getCompactionPolicy().getConf().getMinCompactSize() + 1;
conf.setLong(strPrefix + "min.size", newMinCompactSize);
rs1.getConfigurationManager().notifyAllObservers(conf);
assertEquals(newMinCompactSize,
hstore.getStoreEngine().getCompactionPolicy().getConf().getMinCompactSize());
// Check if the maxCompactSize gets updated.
long newMaxCompactSize =
hstore.getStoreEngine().getCompactionPolicy().getConf().getMaxCompactSize() - 1;
conf.setLong(strPrefix + "max.size", newMaxCompactSize);
rs1.getConfigurationManager().notifyAllObservers(conf);
assertEquals(newMaxCompactSize,
hstore.getStoreEngine().getCompactionPolicy().getConf().getMaxCompactSize());
// Check if majorCompactionPeriod gets updated.
long newMajorCompactionPeriod =
hstore.getStoreEngine().getCompactionPolicy().getConf().getMajorCompactionPeriod() + 10;
conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, newMajorCompactionPeriod);
rs1.getConfigurationManager().notifyAllObservers(conf);
assertEquals(newMajorCompactionPeriod,
hstore.getStoreEngine().getCompactionPolicy().getConf().getMajorCompactionPeriod());
// Check if majorCompactionJitter gets updated.
float newMajorCompactionJitter =
hstore.getStoreEngine().getCompactionPolicy().getConf().getMajorCompactionJitter() + 0.02F;
conf.setFloat("hbase.hregion.majorcompaction.jitter",
newMajorCompactionJitter);
rs1.getConfigurationManager().notifyAllObservers(conf);
assertEquals(newMajorCompactionJitter,
hstore.getStoreEngine().getCompactionPolicy().getConf().getMajorCompactionJitter(), 0.00001);
}
}

View File

@ -62,16 +62,16 @@ public class TestOffPeakHours {
@Test
public void testSetPeakHourToTargetTime() {
conf.setLong("hbase.offpeak.start.hour", hourMinusOne);
conf.setLong("hbase.offpeak.end.hour", hourPlusOne);
conf.setLong(CompactionConfiguration.HBASE_HSTORE_OFFPEAK_START_HOUR, hourMinusOne);
conf.setLong(CompactionConfiguration.HBASE_HSTORE_OFFPEAK_END_HOUR, hourPlusOne);
OffPeakHours target = OffPeakHours.getInstance(conf);
assertTrue(target.isOffPeakHour(hourOfDay));
}
@Test
public void testSetPeakHourOutsideCurrentSelection() {
conf.setLong("hbase.offpeak.start.hour", hourMinusTwo);
conf.setLong("hbase.offpeak.end.hour", hourMinusOne);
conf.setLong(CompactionConfiguration.HBASE_HSTORE_OFFPEAK_START_HOUR, hourMinusTwo);
conf.setLong(CompactionConfiguration.HBASE_HSTORE_OFFPEAK_END_HOUR, hourMinusOne);
OffPeakHours target = OffPeakHours.getInstance(conf);
assertFalse(target.isOffPeakHour(hourOfDay));
}