HBASE-12147 Porting Online Config Change from 89-fb -- REAPPLY
This commit is contained in:
parent
921d331fa3
commit
dbd0ba36f5
|
@ -0,0 +1,139 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.conf;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.WeakHashMap;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Maintains the set of all the classes which would like to get notified
|
||||||
|
* when the Configuration is reloaded from the disk in the Online Configuration
|
||||||
|
* Change mechanism, which lets you update certain configuration properties
|
||||||
|
* on-the-fly, without having to restart the cluster.
|
||||||
|
*
|
||||||
|
* If a class has configuration properties which you would like to be able to
|
||||||
|
* change on-the-fly, do the following:
|
||||||
|
* 1. Implement the {@link ConfigurationObserver} interface. This would require
|
||||||
|
* you to implement the
|
||||||
|
* {@link ConfigurationObserver#onConfigurationChange(Configuration)}
|
||||||
|
* method. This is a callback that is used to notify your class' instance
|
||||||
|
* that the configuration has changed. In this method, you need to check
|
||||||
|
* if the new values for the properties that are of interest to your class
|
||||||
|
* are different from the cached values. If yes, update them.
|
||||||
|
*
|
||||||
|
* However, be careful with this. Certain properties might be trivially
|
||||||
|
* mutable online, but others might not. Two properties might be trivially
|
||||||
|
* mutable by themselves, but not when changed together. For example, if a
|
||||||
|
* method uses properties "a" and "b" to make some decision, and is running
|
||||||
|
* in parallel when the notifyOnChange() method updates "a", but hasn't
|
||||||
|
* yet updated "b", it might make a decision on the basis of a new value of
|
||||||
|
* "a", and an old value of "b". This might introduce subtle bugs. This
|
||||||
|
* needs to be dealt on a case-by-case basis, and this class does not provide
|
||||||
|
* any protection from such cases.
|
||||||
|
*
|
||||||
|
* 2. Register the appropriate instance of the class with the
|
||||||
|
* {@link ConfigurationManager} instance, using the
|
||||||
|
* {@link ConfigurationManager#registerObserver(ConfigurationObserver)}
|
||||||
|
* method. For the RS side of things, the ConfigurationManager is a static
|
||||||
|
* member of the {@link org.apache.hadoop.hbase.regionserver.HRegionServer}
|
||||||
|
* class. Be careful not to do this in the constructor, as you might cause
|
||||||
|
* the 'this' reference to escape. Use a factory method, or an initialize()
|
||||||
|
* method which is called after the construction of the object.
|
||||||
|
*
|
||||||
|
* 3. Deregister the instance using the
|
||||||
|
* {@link ConfigurationManager#deregisterObserver(ConfigurationObserver)}
|
||||||
|
* method when it is going out of scope. In case you are not able to do that
|
||||||
|
* for any reason, it is still okay, since entries for dead observers are
|
||||||
|
* automatically collected during GC. But nonetheless, it is still a good
|
||||||
|
* practice to deregister your observer, whenever possible.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public class ConfigurationManager {
|
||||||
|
public static final Log LOG = LogFactory.getLog(ConfigurationManager.class);
|
||||||
|
|
||||||
|
// The set of Configuration Observers. These classes would like to get
|
||||||
|
// notified when the configuration is reloaded from disk. This is a set
|
||||||
|
// constructed from a WeakHashMap, whose entries would be removed if the
|
||||||
|
// observer classes go out of scope.
|
||||||
|
private Set<ConfigurationObserver> configurationObservers =
|
||||||
|
Collections.newSetFromMap(new WeakHashMap<ConfigurationObserver,
|
||||||
|
Boolean>());
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Register an observer class
|
||||||
|
* @param observer
|
||||||
|
*/
|
||||||
|
public void registerObserver(ConfigurationObserver observer) {
|
||||||
|
synchronized (configurationObservers) {
|
||||||
|
configurationObservers.add(observer);
|
||||||
|
if (observer instanceof PropagatingConfigurationObserver) {
|
||||||
|
((PropagatingConfigurationObserver) observer).registerChildren(this);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Deregister an observer class
|
||||||
|
* @param observer
|
||||||
|
*/
|
||||||
|
public void deregisterObserver(ConfigurationObserver observer) {
|
||||||
|
synchronized (configurationObservers) {
|
||||||
|
configurationObservers.remove(observer);
|
||||||
|
if (observer instanceof PropagatingConfigurationObserver) {
|
||||||
|
((PropagatingConfigurationObserver) observer).deregisterChildren(this);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The conf object has been repopulated from disk, and we have to notify
|
||||||
|
* all the observers that are expressed interest to do that.
|
||||||
|
*/
|
||||||
|
public void notifyAllObservers(Configuration conf) {
|
||||||
|
synchronized (configurationObservers) {
|
||||||
|
for (ConfigurationObserver observer : configurationObservers) {
|
||||||
|
try {
|
||||||
|
if (observer != null) {
|
||||||
|
observer.onConfigurationChange(conf);
|
||||||
|
}
|
||||||
|
} catch (Throwable t) {
|
||||||
|
LOG.error("Encountered a throwable while notifying observers: " + " of type : " +
|
||||||
|
observer.getClass().getCanonicalName() + "(" + observer + ")", t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the number of observers.
|
||||||
|
*/
|
||||||
|
public int getNumObservers() {
|
||||||
|
synchronized (configurationObservers) {
|
||||||
|
return configurationObservers.size();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,38 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.conf;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Every class that wants to observe changes in Configuration properties,
|
||||||
|
* must implement interface (and also, register itself with the
|
||||||
|
* <code>ConfigurationManager</code> object.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public interface ConfigurationObserver {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method would be called by the {@link ConfigurationManager}
|
||||||
|
* object when the {@link Configuration} object is reloaded from disk.
|
||||||
|
*/
|
||||||
|
void onConfigurationChange(Configuration conf);
|
||||||
|
}
|
|
@ -0,0 +1,43 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.conf;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This extension to ConfigurationObserver allows the configuration to propagate to the children of
|
||||||
|
* the current {@link ConfigurationObserver}. This is the preferred way to make a class online
|
||||||
|
* configurable because it allows the user to configure the children in a recursive manner
|
||||||
|
* automatically.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public interface PropagatingConfigurationObserver extends ConfigurationObserver {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Needs to be called to register the children to the manager.
|
||||||
|
* @param manager : to register to
|
||||||
|
*/
|
||||||
|
void registerChildren(ConfigurationManager manager);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Needs to be called to deregister the children from the manager.
|
||||||
|
* @param manager : to deregister from
|
||||||
|
*/
|
||||||
|
void deregisterChildren(ConfigurationManager manager);
|
||||||
|
}
|
|
@ -36,6 +36,8 @@ import java.util.concurrent.TimeUnit;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.conf.ConfigurationManager;
|
||||||
|
import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
||||||
|
@ -50,9 +52,27 @@ import com.google.common.base.Preconditions;
|
||||||
* Compact region on request and then run split if appropriate
|
* Compact region on request and then run split if appropriate
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class CompactSplitThread implements CompactionRequestor {
|
public class CompactSplitThread implements CompactionRequestor, PropagatingConfigurationObserver {
|
||||||
static final Log LOG = LogFactory.getLog(CompactSplitThread.class);
|
static final Log LOG = LogFactory.getLog(CompactSplitThread.class);
|
||||||
|
|
||||||
|
// Configuration key for the large compaction threads.
|
||||||
|
public final static String LARGE_COMPACTION_THREADS =
|
||||||
|
"hbase.regionserver.thread.compaction.large";
|
||||||
|
public final static int LARGE_COMPACTION_THREADS_DEFAULT = 1;
|
||||||
|
|
||||||
|
// Configuration key for the small compaction threads.
|
||||||
|
public final static String SMALL_COMPACTION_THREADS =
|
||||||
|
"hbase.regionserver.thread.compaction.small";
|
||||||
|
public final static int SMALL_COMPACTION_THREADS_DEFAULT = 1;
|
||||||
|
|
||||||
|
// Configuration key for split threads
|
||||||
|
public final static String SPLIT_THREADS = "hbase.regionserver.thread.split";
|
||||||
|
public final static int SPLIT_THREADS_DEFAULT = 1;
|
||||||
|
|
||||||
|
// Configuration keys for merge threads
|
||||||
|
public final static String MERGE_THREADS = "hbase.regionserver.thread.merge";
|
||||||
|
public final static int MERGE_THREADS_DEFAULT = 1;
|
||||||
|
|
||||||
private final HRegionServer server;
|
private final HRegionServer server;
|
||||||
private final Configuration conf;
|
private final Configuration conf;
|
||||||
|
|
||||||
|
@ -77,11 +97,11 @@ public class CompactSplitThread implements CompactionRequestor {
|
||||||
Integer.MAX_VALUE);
|
Integer.MAX_VALUE);
|
||||||
|
|
||||||
int largeThreads = Math.max(1, conf.getInt(
|
int largeThreads = Math.max(1, conf.getInt(
|
||||||
"hbase.regionserver.thread.compaction.large", 1));
|
LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT));
|
||||||
int smallThreads = conf.getInt(
|
int smallThreads = conf.getInt(
|
||||||
"hbase.regionserver.thread.compaction.small", 1);
|
SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT);
|
||||||
|
|
||||||
int splitThreads = conf.getInt("hbase.regionserver.thread.split", 1);
|
int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT);
|
||||||
|
|
||||||
// if we have throttle threads, make sure the user also specified size
|
// if we have throttle threads, make sure the user also specified size
|
||||||
Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0);
|
Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0);
|
||||||
|
@ -121,7 +141,7 @@ public class CompactSplitThread implements CompactionRequestor {
|
||||||
return t;
|
return t;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
int mergeThreads = conf.getInt("hbase.regionserver.thread.merge", 1);
|
int mergeThreads = conf.getInt(MERGE_THREADS, MERGE_THREADS_DEFAULT);
|
||||||
this.mergePool = (ThreadPoolExecutor) Executors.newFixedThreadPool(
|
this.mergePool = (ThreadPoolExecutor) Executors.newFixedThreadPool(
|
||||||
mergeThreads, new ThreadFactory() {
|
mergeThreads, new ThreadFactory() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -147,7 +167,7 @@ public class CompactSplitThread implements CompactionRequestor {
|
||||||
queueLists.append("Compaction/Split Queue dump:\n");
|
queueLists.append("Compaction/Split Queue dump:\n");
|
||||||
queueLists.append(" LargeCompation Queue:\n");
|
queueLists.append(" LargeCompation Queue:\n");
|
||||||
BlockingQueue<Runnable> lq = longCompactions.getQueue();
|
BlockingQueue<Runnable> lq = longCompactions.getQueue();
|
||||||
Iterator it = lq.iterator();
|
Iterator<Runnable> it = lq.iterator();
|
||||||
while(it.hasNext()){
|
while(it.hasNext()){
|
||||||
queueLists.append(" "+it.next().toString());
|
queueLists.append(" "+it.next().toString());
|
||||||
queueLists.append("\n");
|
queueLists.append("\n");
|
||||||
|
@ -539,4 +559,84 @@ public class CompactSplitThread implements CompactionRequestor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void onConfigurationChange(Configuration newConf) {
|
||||||
|
// Check if number of large / small compaction threads has changed, and then
|
||||||
|
// adjust the core pool size of the thread pools, by using the
|
||||||
|
// setCorePoolSize() method. According to the javadocs, it is safe to
|
||||||
|
// change the core pool size on-the-fly. We need to reset the maximum
|
||||||
|
// pool size, as well.
|
||||||
|
int largeThreads = Math.max(1, newConf.getInt(
|
||||||
|
LARGE_COMPACTION_THREADS,
|
||||||
|
LARGE_COMPACTION_THREADS_DEFAULT));
|
||||||
|
if (this.longCompactions.getCorePoolSize() != largeThreads) {
|
||||||
|
LOG.info("Changing the value of " + LARGE_COMPACTION_THREADS +
|
||||||
|
" from " + this.longCompactions.getCorePoolSize() + " to " +
|
||||||
|
largeThreads);
|
||||||
|
this.longCompactions.setMaximumPoolSize(largeThreads);
|
||||||
|
this.longCompactions.setCorePoolSize(largeThreads);
|
||||||
|
}
|
||||||
|
|
||||||
|
int smallThreads = newConf.getInt(SMALL_COMPACTION_THREADS,
|
||||||
|
SMALL_COMPACTION_THREADS_DEFAULT);
|
||||||
|
if (this.shortCompactions.getCorePoolSize() != smallThreads) {
|
||||||
|
LOG.info("Changing the value of " + SMALL_COMPACTION_THREADS +
|
||||||
|
" from " + this.shortCompactions.getCorePoolSize() + " to " +
|
||||||
|
smallThreads);
|
||||||
|
this.shortCompactions.setMaximumPoolSize(smallThreads);
|
||||||
|
this.shortCompactions.setCorePoolSize(smallThreads);
|
||||||
|
}
|
||||||
|
|
||||||
|
int splitThreads = newConf.getInt(SPLIT_THREADS,
|
||||||
|
SPLIT_THREADS_DEFAULT);
|
||||||
|
if (this.splits.getCorePoolSize() != splitThreads) {
|
||||||
|
LOG.info("Changing the value of " + SPLIT_THREADS +
|
||||||
|
" from " + this.splits.getCorePoolSize() + " to " +
|
||||||
|
splitThreads);
|
||||||
|
this.splits.setMaximumPoolSize(smallThreads);
|
||||||
|
this.splits.setCorePoolSize(smallThreads);
|
||||||
|
}
|
||||||
|
|
||||||
|
int mergeThreads = newConf.getInt(MERGE_THREADS,
|
||||||
|
MERGE_THREADS_DEFAULT);
|
||||||
|
if (this.mergePool.getCorePoolSize() != mergeThreads) {
|
||||||
|
LOG.info("Changing the value of " + MERGE_THREADS +
|
||||||
|
" from " + this.mergePool.getCorePoolSize() + " to " +
|
||||||
|
mergeThreads);
|
||||||
|
this.mergePool.setMaximumPoolSize(smallThreads);
|
||||||
|
this.mergePool.setCorePoolSize(smallThreads);
|
||||||
|
}
|
||||||
|
|
||||||
|
// We change this atomically here instead of reloading the config in order that upstream
|
||||||
|
// would be the only one with the flexibility to reload the config.
|
||||||
|
this.conf.reloadConfiguration();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected int getSmallCompactionThreadNum() {
|
||||||
|
return this.shortCompactions.getCorePoolSize();
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getLargeCompactionThreadNum() {
|
||||||
|
return this.longCompactions.getCorePoolSize();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void registerChildren(ConfigurationManager manager) {
|
||||||
|
// No children to register.
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void deregisterChildren(ConfigurationManager manager) {
|
||||||
|
// No children to register
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -97,6 +97,8 @@ import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.RowMutations;
|
import org.apache.hadoop.hbase.client.RowMutations;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.conf.ConfigurationManager;
|
||||||
|
import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
|
||||||
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
|
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
|
||||||
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
|
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
|
||||||
import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
|
import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
|
||||||
|
@ -149,6 +151,7 @@ import org.apache.hadoop.io.MultipleIOException;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.common.base.Optional;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
|
@ -196,7 +199,7 @@ import com.google.protobuf.Service;
|
||||||
* defines the keyspace for this HRegion.
|
* defines the keyspace for this HRegion.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class HRegion implements HeapSize { // , Writable{
|
public class HRegion implements HeapSize, PropagatingConfigurationObserver { // , Writable{
|
||||||
public static final Log LOG = LogFactory.getLog(HRegion.class);
|
public static final Log LOG = LogFactory.getLog(HRegion.class);
|
||||||
|
|
||||||
public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY =
|
public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY =
|
||||||
|
@ -347,6 +350,8 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
// when a region is in recovering state, it can only accept writes not reads
|
// when a region is in recovering state, it can only accept writes not reads
|
||||||
private volatile boolean isRecovering = false;
|
private volatile boolean isRecovering = false;
|
||||||
|
|
||||||
|
private volatile Optional<ConfigurationManager> configurationManager;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return The smallest mvcc readPoint across all the scanners in this
|
* @return The smallest mvcc readPoint across all the scanners in this
|
||||||
* region. Writes older than this readPoint, are included in every
|
* region. Writes older than this readPoint, are included in every
|
||||||
|
@ -665,6 +670,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
this.disallowWritesInRecovering =
|
this.disallowWritesInRecovering =
|
||||||
conf.getBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING,
|
conf.getBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING,
|
||||||
HConstants.DEFAULT_DISALLOW_WRITES_IN_RECOVERING_CONFIG);
|
HConstants.DEFAULT_DISALLOW_WRITES_IN_RECOVERING_CONFIG);
|
||||||
|
configurationManager = Optional.absent();
|
||||||
}
|
}
|
||||||
|
|
||||||
void setHTableSpecificConf() {
|
void setHTableSpecificConf() {
|
||||||
|
@ -5753,7 +5759,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
public static final long FIXED_OVERHEAD = ClassSize.align(
|
public static final long FIXED_OVERHEAD = ClassSize.align(
|
||||||
ClassSize.OBJECT +
|
ClassSize.OBJECT +
|
||||||
ClassSize.ARRAY +
|
ClassSize.ARRAY +
|
||||||
40 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
|
41 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
|
||||||
(12 * Bytes.SIZEOF_LONG) +
|
(12 * Bytes.SIZEOF_LONG) +
|
||||||
4 * Bytes.SIZEOF_BOOLEAN);
|
4 * Bytes.SIZEOF_BOOLEAN);
|
||||||
|
|
||||||
|
@ -6485,4 +6491,33 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
this.log.sync();
|
this.log.sync();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void onConfigurationChange(Configuration conf) {
|
||||||
|
// Do nothing for now.
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void registerChildren(ConfigurationManager manager) {
|
||||||
|
configurationManager = Optional.of(manager);
|
||||||
|
for (Store s : this.stores.values()) {
|
||||||
|
configurationManager.get().registerObserver(s);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void deregisterChildren(ConfigurationManager manager) {
|
||||||
|
for (Store s : this.stores.values()) {
|
||||||
|
configurationManager.get().deregisterObserver(s);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -77,6 +77,8 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.client.ConnectionUtils;
|
import org.apache.hadoop.hbase.client.ConnectionUtils;
|
||||||
import org.apache.hadoop.hbase.client.HConnection;
|
import org.apache.hadoop.hbase.client.HConnection;
|
||||||
import org.apache.hadoop.hbase.client.HConnectionManager;
|
import org.apache.hadoop.hbase.client.HConnectionManager;
|
||||||
|
import org.apache.hadoop.hbase.conf.ConfigurationManager;
|
||||||
|
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
|
||||||
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
|
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
|
||||||
import org.apache.hadoop.hbase.coordination.CloseRegionCoordination;
|
import org.apache.hadoop.hbase.coordination.CloseRegionCoordination;
|
||||||
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
|
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
|
||||||
|
@ -430,6 +432,12 @@ public class HRegionServer extends HasThread implements
|
||||||
|
|
||||||
private final boolean useZKForAssignment;
|
private final boolean useZKForAssignment;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configuration manager is used to register/deregister and notify the configuration observers
|
||||||
|
* when the regionserver is notified that there was a change in the on disk configs.
|
||||||
|
*/
|
||||||
|
private final ConfigurationManager configurationManager;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Starts a HRegionServer at the default location.
|
* Starts a HRegionServer at the default location.
|
||||||
* @param conf
|
* @param conf
|
||||||
|
@ -531,6 +539,7 @@ public class HRegionServer extends HasThread implements
|
||||||
clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this);
|
clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this);
|
||||||
clusterStatusTracker.start();
|
clusterStatusTracker.start();
|
||||||
}
|
}
|
||||||
|
this.configurationManager = new ConfigurationManager();
|
||||||
|
|
||||||
rpcServices.start();
|
rpcServices.start();
|
||||||
putUpWebUI();
|
putUpWebUI();
|
||||||
|
@ -763,6 +772,12 @@ public class HRegionServer extends HasThread implements
|
||||||
if (storefileRefreshPeriod > 0) {
|
if (storefileRefreshPeriod > 0) {
|
||||||
this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod, this, this);
|
this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod, this, this);
|
||||||
}
|
}
|
||||||
|
registerConfigurationObservers();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void registerConfigurationObservers() {
|
||||||
|
// Registering the compactSplitThread object with the ConfigurationManager.
|
||||||
|
configurationManager.registerObserver(this.compactSplitThread);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -2249,6 +2264,7 @@ public class HRegionServer extends HasThread implements
|
||||||
@Override
|
@Override
|
||||||
public void addToOnlineRegions(HRegion region) {
|
public void addToOnlineRegions(HRegion region) {
|
||||||
this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
|
this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
|
||||||
|
configurationManager.registerObserver(region);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -3047,4 +3063,12 @@ public class HRegionServer extends HasThread implements
|
||||||
public CacheConfig getCacheConfig() {
|
public CacheConfig getCacheConfig() {
|
||||||
return this.cacheConfig;
|
return this.cacheConfig;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return : Returns the ConfigurationManager object for testing purposes.
|
||||||
|
*/
|
||||||
|
protected ConfigurationManager getConfigurationManager() {
|
||||||
|
return configurationManager;
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.conf.ConfigurationManager;
|
||||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||||
import org.apache.hadoop.hbase.io.crypto.Cipher;
|
import org.apache.hadoop.hbase.io.crypto.Cipher;
|
||||||
import org.apache.hadoop.hbase.io.crypto.Encryption;
|
import org.apache.hadoop.hbase.io.crypto.Encryption;
|
||||||
|
@ -90,6 +91,7 @@ import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.common.base.Optional;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.collect.ImmutableCollection;
|
import com.google.common.collect.ImmutableCollection;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
|
@ -135,7 +137,7 @@ public class HStore implements Store {
|
||||||
private final HRegion region;
|
private final HRegion region;
|
||||||
private final HColumnDescriptor family;
|
private final HColumnDescriptor family;
|
||||||
private final HRegionFileSystem fs;
|
private final HRegionFileSystem fs;
|
||||||
private final Configuration conf;
|
private Configuration conf;
|
||||||
private final CacheConfig cacheConf;
|
private final CacheConfig cacheConf;
|
||||||
private long lastCompactSize = 0;
|
private long lastCompactSize = 0;
|
||||||
volatile boolean forceMajor = false;
|
volatile boolean forceMajor = false;
|
||||||
|
@ -178,7 +180,7 @@ public class HStore implements Store {
|
||||||
final StoreEngine<?, ?, ?, ?> storeEngine;
|
final StoreEngine<?, ?, ?, ?> storeEngine;
|
||||||
|
|
||||||
private static final AtomicBoolean offPeakCompactionTracker = new AtomicBoolean();
|
private static final AtomicBoolean offPeakCompactionTracker = new AtomicBoolean();
|
||||||
private final OffPeakHours offPeakHours;
|
private volatile OffPeakHours offPeakHours;
|
||||||
|
|
||||||
private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10;
|
private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10;
|
||||||
private int flushRetriesNumber;
|
private int flushRetriesNumber;
|
||||||
|
@ -2201,4 +2203,44 @@ public class HStore implements Store {
|
||||||
public long getMajorCompactedCellsSize() {
|
public long getMajorCompactedCellsSize() {
|
||||||
return majorCompactedCellsSize;
|
return majorCompactedCellsSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the StoreEngine that is backing this concrete implementation of Store.
|
||||||
|
* @return Returns the {@link StoreEngine} object used internally inside this HStore object.
|
||||||
|
*/
|
||||||
|
protected StoreEngine<?, ?, ?, ?> getStoreEngine() {
|
||||||
|
return this.storeEngine;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected OffPeakHours getOffPeakHours() {
|
||||||
|
return this.offPeakHours;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void onConfigurationChange(Configuration conf) {
|
||||||
|
this.conf = new CompoundConfiguration()
|
||||||
|
.add(conf)
|
||||||
|
.addWritableMap(family.getValues());
|
||||||
|
this.storeEngine.compactionPolicy.setConf(conf);
|
||||||
|
this.offPeakHours = OffPeakHours.getInstance(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void registerChildren(ConfigurationManager manager) {
|
||||||
|
// No children to register
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void deregisterChildren(ConfigurationManager manager) {
|
||||||
|
// No children to deregister
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,6 +32,8 @@ import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
|
||||||
|
import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
|
||||||
import org.apache.hadoop.hbase.io.HeapSize;
|
import org.apache.hadoop.hbase.io.HeapSize;
|
||||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||||
|
@ -48,7 +50,7 @@ import org.apache.hadoop.hbase.util.Pair;
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
public interface Store extends HeapSize, StoreConfigInformation {
|
public interface Store extends HeapSize, StoreConfigInformation, PropagatingConfigurationObserver {
|
||||||
|
|
||||||
/* The default priority for user-specified compaction requests.
|
/* The default priority for user-specified compaction requests.
|
||||||
* The user gets top priority unless we have blocking compactions. (Pri <= 0)
|
* The user gets top priority unless we have blocking compactions. (Pri <= 0)
|
||||||
|
|
|
@ -40,7 +40,6 @@ import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
|
||||||
* Set parameter as "hbase.hstore.compaction.<attribute>"
|
* Set parameter as "hbase.hstore.compaction.<attribute>"
|
||||||
*/
|
*/
|
||||||
|
|
||||||
//TODO: revisit this class for online parameter updating (both in xml and on the CF)
|
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class CompactionConfiguration {
|
public class CompactionConfiguration {
|
||||||
|
|
||||||
|
@ -55,19 +54,22 @@ public class CompactionConfiguration {
|
||||||
public static final String HBASE_HSTORE_COMPACTION_MAX_KEY = "hbase.hstore.compaction.max";
|
public static final String HBASE_HSTORE_COMPACTION_MAX_KEY = "hbase.hstore.compaction.max";
|
||||||
public static final String HBASE_HSTORE_COMPACTION_MAX_SIZE_KEY =
|
public static final String HBASE_HSTORE_COMPACTION_MAX_SIZE_KEY =
|
||||||
"hbase.hstore.compaction.max.size";
|
"hbase.hstore.compaction.max.size";
|
||||||
|
public static final String HBASE_HSTORE_OFFPEAK_END_HOUR = "hbase.offpeak.end.hour";
|
||||||
|
public static final String HBASE_HSTORE_OFFPEAK_START_HOUR = "hbase.offpeak.start.hour";
|
||||||
|
|
||||||
Configuration conf;
|
Configuration conf;
|
||||||
StoreConfigInformation storeConfigInfo;
|
StoreConfigInformation storeConfigInfo;
|
||||||
|
|
||||||
long maxCompactSize;
|
private final double offPeakCompactionRatio;
|
||||||
long minCompactSize;
|
/** Since all these properties can change online, they are volatile **/
|
||||||
int minFilesToCompact;
|
private final long maxCompactSize;
|
||||||
int maxFilesToCompact;
|
private final long minCompactSize;
|
||||||
double compactionRatio;
|
private final int minFilesToCompact;
|
||||||
double offPeekCompactionRatio;
|
private final int maxFilesToCompact;
|
||||||
long throttlePoint;
|
private final double compactionRatio;
|
||||||
long majorCompactionPeriod;
|
private final long throttlePoint;
|
||||||
float majorCompactionJitter;
|
private final long majorCompactionPeriod;
|
||||||
|
private final float majorCompactionJitter;
|
||||||
|
|
||||||
CompactionConfiguration(Configuration conf, StoreConfigInformation storeConfigInfo) {
|
CompactionConfiguration(Configuration conf, StoreConfigInformation storeConfigInfo) {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
|
@ -80,9 +82,9 @@ public class CompactionConfiguration {
|
||||||
/*old name*/ conf.getInt("hbase.hstore.compactionThreshold", 3)));
|
/*old name*/ conf.getInt("hbase.hstore.compactionThreshold", 3)));
|
||||||
maxFilesToCompact = conf.getInt(HBASE_HSTORE_COMPACTION_MAX_KEY, 10);
|
maxFilesToCompact = conf.getInt(HBASE_HSTORE_COMPACTION_MAX_KEY, 10);
|
||||||
compactionRatio = conf.getFloat(HBASE_HSTORE_COMPACTION_RATIO_KEY, 1.2F);
|
compactionRatio = conf.getFloat(HBASE_HSTORE_COMPACTION_RATIO_KEY, 1.2F);
|
||||||
offPeekCompactionRatio = conf.getFloat(HBASE_HSTORE_COMPACTION_RATIO_OFFPEAK_KEY, 5.0F);
|
offPeakCompactionRatio = conf.getFloat(HBASE_HSTORE_COMPACTION_RATIO_OFFPEAK_KEY, 5.0F);
|
||||||
|
|
||||||
throttlePoint = conf.getLong("hbase.regionserver.thread.compaction.throttle",
|
throttlePoint = conf.getLong("hbase.regionserver.thread.compaction.throttle",
|
||||||
2 * maxFilesToCompact * storeConfigInfo.getMemstoreFlushSize());
|
2 * maxFilesToCompact * storeConfigInfo.getMemstoreFlushSize());
|
||||||
majorCompactionPeriod = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24*7);
|
majorCompactionPeriod = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24*7);
|
||||||
// Make it 0.5 so jitter has us fall evenly either side of when the compaction should run
|
// Make it 0.5 so jitter has us fall evenly either side of when the compaction should run
|
||||||
|
@ -101,7 +103,7 @@ public class CompactionConfiguration {
|
||||||
minFilesToCompact,
|
minFilesToCompact,
|
||||||
maxFilesToCompact,
|
maxFilesToCompact,
|
||||||
compactionRatio,
|
compactionRatio,
|
||||||
offPeekCompactionRatio,
|
offPeakCompactionRatio,
|
||||||
throttlePoint,
|
throttlePoint,
|
||||||
majorCompactionPeriod,
|
majorCompactionPeriod,
|
||||||
majorCompactionJitter);
|
majorCompactionJitter);
|
||||||
|
@ -110,49 +112,49 @@ public class CompactionConfiguration {
|
||||||
/**
|
/**
|
||||||
* @return lower bound below which compaction is selected without ratio test
|
* @return lower bound below which compaction is selected without ratio test
|
||||||
*/
|
*/
|
||||||
long getMinCompactSize() {
|
public long getMinCompactSize() {
|
||||||
return minCompactSize;
|
return minCompactSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return upper bound on file size to be included in minor compactions
|
* @return upper bound on file size to be included in minor compactions
|
||||||
*/
|
*/
|
||||||
long getMaxCompactSize() {
|
public long getMaxCompactSize() {
|
||||||
return maxCompactSize;
|
return maxCompactSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return upper bound on number of files to be included in minor compactions
|
* @return upper bound on number of files to be included in minor compactions
|
||||||
*/
|
*/
|
||||||
int getMinFilesToCompact() {
|
public int getMinFilesToCompact() {
|
||||||
return minFilesToCompact;
|
return minFilesToCompact;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return upper bound on number of files to be included in minor compactions
|
* @return upper bound on number of files to be included in minor compactions
|
||||||
*/
|
*/
|
||||||
int getMaxFilesToCompact() {
|
public int getMaxFilesToCompact() {
|
||||||
return maxFilesToCompact;
|
return maxFilesToCompact;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return Ratio used for compaction
|
* @return Ratio used for compaction
|
||||||
*/
|
*/
|
||||||
double getCompactionRatio() {
|
public double getCompactionRatio() {
|
||||||
return compactionRatio;
|
return compactionRatio;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return Off peak Ratio used for compaction
|
* @return Off peak Ratio used for compaction
|
||||||
*/
|
*/
|
||||||
double getCompactionRatioOffPeak() {
|
public double getCompactionRatioOffPeak() {
|
||||||
return offPeekCompactionRatio;
|
return offPeakCompactionRatio;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return ThrottlePoint used for classifying small and large compactions
|
* @return ThrottlePoint used for classifying small and large compactions
|
||||||
*/
|
*/
|
||||||
long getThrottlePoint() {
|
public long getThrottlePoint() {
|
||||||
return throttlePoint;
|
return throttlePoint;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -160,7 +162,7 @@ public class CompactionConfiguration {
|
||||||
* @return Major compaction period from compaction.
|
* @return Major compaction period from compaction.
|
||||||
* Major compactions are selected periodically according to this parameter plus jitter
|
* Major compactions are selected periodically according to this parameter plus jitter
|
||||||
*/
|
*/
|
||||||
long getMajorCompactionPeriod() {
|
public long getMajorCompactionPeriod() {
|
||||||
return majorCompactionPeriod;
|
return majorCompactionPeriod;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -168,7 +170,7 @@ public class CompactionConfiguration {
|
||||||
* @return Major the jitter fraction, the fraction within which the major compaction
|
* @return Major the jitter fraction, the fraction within which the major compaction
|
||||||
* period is randomly chosen from the majorCompactionPeriod in each store.
|
* period is randomly chosen from the majorCompactionPeriod in each store.
|
||||||
*/
|
*/
|
||||||
float getMajorCompactionJitter() {
|
public float getMajorCompactionJitter() {
|
||||||
return majorCompactionJitter;
|
return majorCompactionJitter;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.regionserver.compactions;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -62,4 +61,11 @@ public abstract class CompactionPolicy {
|
||||||
public void setConf(Configuration conf) {
|
public void setConf(Configuration conf) {
|
||||||
this.comConf = new CompactionConfiguration(conf, this.storeConfigInfo);
|
this.comConf = new CompactionConfiguration(conf, this.storeConfigInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return The current compaction configuration settings.
|
||||||
|
*/
|
||||||
|
public CompactionConfiguration getConf() {
|
||||||
|
return this.comConf;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,8 +32,8 @@ public abstract class OffPeakHours {
|
||||||
};
|
};
|
||||||
|
|
||||||
public static OffPeakHours getInstance(Configuration conf) {
|
public static OffPeakHours getInstance(Configuration conf) {
|
||||||
int startHour = conf.getInt("hbase.offpeak.start.hour", -1);
|
int startHour = conf.getInt(CompactionConfiguration.HBASE_HSTORE_OFFPEAK_START_HOUR, -1);
|
||||||
int endHour = conf.getInt("hbase.offpeak.end.hour", -1);
|
int endHour = conf.getInt(CompactionConfiguration.HBASE_HSTORE_OFFPEAK_END_HOUR, -1);
|
||||||
return getInstance(startHour, endHour);
|
return getInstance(startHour, endHour);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,132 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.conf;
|
||||||
|
|
||||||
|
import junit.framework.TestCase;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.SmallTests;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@Category(SmallTests.class)
|
||||||
|
public class TestConfigurationManager extends TestCase {
|
||||||
|
public static final Log LOG = LogFactory.getLog(TestConfigurationManager.class);
|
||||||
|
|
||||||
|
class DummyConfigurationObserver implements ConfigurationObserver {
|
||||||
|
private boolean notifiedOnChange = false;
|
||||||
|
private ConfigurationManager cm;
|
||||||
|
|
||||||
|
public DummyConfigurationObserver(ConfigurationManager cm) {
|
||||||
|
this.cm = cm;
|
||||||
|
register();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void onConfigurationChange(Configuration conf) {
|
||||||
|
notifiedOnChange = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Was the observer notified on Configuration change?
|
||||||
|
public boolean wasNotifiedOnChange() {
|
||||||
|
return notifiedOnChange;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void resetNotifiedOnChange() {
|
||||||
|
notifiedOnChange = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void register() {
|
||||||
|
this.cm.registerObserver(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void deregister() {
|
||||||
|
this.cm.deregisterObserver(this);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test if observers get notified by the <code>ConfigurationManager</code>
|
||||||
|
* when the Configuration is reloaded.
|
||||||
|
*/
|
||||||
|
public void testCheckIfObserversNotified() {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
ConfigurationManager cm = new ConfigurationManager();
|
||||||
|
DummyConfigurationObserver d1 = new DummyConfigurationObserver(cm);
|
||||||
|
|
||||||
|
// Check if we get notified.
|
||||||
|
cm.notifyAllObservers(conf);
|
||||||
|
assertTrue(d1.wasNotifiedOnChange());
|
||||||
|
d1.resetNotifiedOnChange();
|
||||||
|
|
||||||
|
// Now check if we get notified on change with more than one observers.
|
||||||
|
DummyConfigurationObserver d2 = new DummyConfigurationObserver(cm);
|
||||||
|
cm.notifyAllObservers(conf);
|
||||||
|
assertTrue(d1.wasNotifiedOnChange());
|
||||||
|
d1.resetNotifiedOnChange();
|
||||||
|
assertTrue(d2.wasNotifiedOnChange());
|
||||||
|
d2.resetNotifiedOnChange();
|
||||||
|
|
||||||
|
// Now try deregistering an observer and verify that it was not notified
|
||||||
|
d2.deregister();
|
||||||
|
cm.notifyAllObservers(conf);
|
||||||
|
assertTrue(d1.wasNotifiedOnChange());
|
||||||
|
d1.resetNotifiedOnChange();
|
||||||
|
assertFalse(d2.wasNotifiedOnChange());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Register an observer that will go out of scope immediately, allowing
|
||||||
|
// us to test that out of scope observers are deregistered.
|
||||||
|
private void registerLocalObserver(ConfigurationManager cm) {
|
||||||
|
new DummyConfigurationObserver(cm);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test if out-of-scope observers are deregistered on GC.
|
||||||
|
*/
|
||||||
|
public void testDeregisterOnOutOfScope() {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
ConfigurationManager cm = new ConfigurationManager();
|
||||||
|
|
||||||
|
boolean outOfScopeObserversDeregistered = false;
|
||||||
|
|
||||||
|
// On my machine, I was able to cause a GC after around 5 iterations.
|
||||||
|
// If we do not cause a GC in 100k iterations, which is very unlikely,
|
||||||
|
// there might be something wrong with the GC.
|
||||||
|
for (int i = 0; i < 100000; i++) {
|
||||||
|
registerLocalObserver(cm);
|
||||||
|
cm.notifyAllObservers(conf);
|
||||||
|
|
||||||
|
// 'Suggest' the system to do a GC. We should be able to cause GC
|
||||||
|
// atleast once in the 2000 iterations.
|
||||||
|
System.gc();
|
||||||
|
|
||||||
|
// If GC indeed happened, all the observers (which are all out of scope),
|
||||||
|
// should have been deregistered.
|
||||||
|
if (cm.getNumObservers() <= i) {
|
||||||
|
outOfScopeObserversDeregistered = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!outOfScopeObserversDeregistered) {
|
||||||
|
LOG.warn("Observers were not GC-ed! Something seems to be wrong.");
|
||||||
|
}
|
||||||
|
assertTrue(outOfScopeObserversDeregistered);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,205 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import junit.framework.TestCase;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify that the Online config Changes on the HRegionServer side are actually
|
||||||
|
* happening. We should add tests for important configurations which will be
|
||||||
|
* changed online.
|
||||||
|
*/
|
||||||
|
|
||||||
|
@Category({MediumTests.class})
|
||||||
|
public class TestRegionServerOnlineConfigChange extends TestCase {
|
||||||
|
static final Log LOG =
|
||||||
|
LogFactory.getLog(TestRegionServerOnlineConfigChange.class.getName());
|
||||||
|
HBaseTestingUtility hbaseTestingUtility = new HBaseTestingUtility();
|
||||||
|
Configuration conf = null;
|
||||||
|
|
||||||
|
HTable t1 = null;
|
||||||
|
HRegionServer rs1 = null;
|
||||||
|
byte[] r1name = null;
|
||||||
|
HRegion r1 = null;
|
||||||
|
|
||||||
|
final String table1Str = "table1";
|
||||||
|
final String columnFamily1Str = "columnFamily1";
|
||||||
|
final byte[] TABLE1 = Bytes.toBytes(table1Str);
|
||||||
|
final byte[] COLUMN_FAMILY1 = Bytes.toBytes(columnFamily1Str);
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
conf = hbaseTestingUtility.getConfiguration();
|
||||||
|
hbaseTestingUtility.startMiniCluster(1,1);
|
||||||
|
t1 = hbaseTestingUtility.createTable(TABLE1, COLUMN_FAMILY1);
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
|
HRegionInfo firstHRI = t1.getRegionLocations().keySet().iterator().next();
|
||||||
|
r1name = firstHRI.getRegionName();
|
||||||
|
rs1 = hbaseTestingUtility.getHBaseCluster().getRegionServer(
|
||||||
|
hbaseTestingUtility.getHBaseCluster().getServerWith(r1name));
|
||||||
|
r1 = rs1.getRegion(r1name);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
hbaseTestingUtility.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if the number of compaction threads changes online
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public void testNumCompactionThreadsOnlineChange() throws IOException {
|
||||||
|
assertTrue(rs1.compactSplitThread != null);
|
||||||
|
int newNumSmallThreads =
|
||||||
|
rs1.compactSplitThread.getSmallCompactionThreadNum() + 1;
|
||||||
|
int newNumLargeThreads =
|
||||||
|
rs1.compactSplitThread.getLargeCompactionThreadNum() + 1;
|
||||||
|
|
||||||
|
conf.setInt("hbase.regionserver.thread.compaction.small",
|
||||||
|
newNumSmallThreads);
|
||||||
|
conf.setInt("hbase.regionserver.thread.compaction.large",
|
||||||
|
newNumLargeThreads);
|
||||||
|
rs1.getConfigurationManager().notifyAllObservers(conf);
|
||||||
|
|
||||||
|
assertEquals(newNumSmallThreads,
|
||||||
|
rs1.compactSplitThread.getSmallCompactionThreadNum());
|
||||||
|
assertEquals(newNumLargeThreads,
|
||||||
|
rs1.compactSplitThread.getLargeCompactionThreadNum());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that the configurations in the CompactionConfiguration class change
|
||||||
|
* properly.
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public void testCompactionConfigurationOnlineChange() throws IOException {
|
||||||
|
String strPrefix = "hbase.hstore.compaction.";
|
||||||
|
Store s = r1.getStore(COLUMN_FAMILY1);
|
||||||
|
if (!(s instanceof HStore)) {
|
||||||
|
LOG.error("Can't test the compaction configuration of HStore class. "
|
||||||
|
+ "Got a different implementation other than HStore");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
HStore hstore = (HStore)s;
|
||||||
|
|
||||||
|
// Set the new compaction ratio to a different value.
|
||||||
|
double newCompactionRatio =
|
||||||
|
hstore.getStoreEngine().getCompactionPolicy().getConf().getCompactionRatio() + 0.1;
|
||||||
|
conf.setFloat(strPrefix + "ratio", (float)newCompactionRatio);
|
||||||
|
|
||||||
|
// Notify all the observers, which includes the Store object.
|
||||||
|
rs1.getConfigurationManager().notifyAllObservers(conf);
|
||||||
|
|
||||||
|
// Check if the compaction ratio got updated in the Compaction Configuration
|
||||||
|
assertEquals(newCompactionRatio,
|
||||||
|
hstore.getStoreEngine().getCompactionPolicy().getConf().getCompactionRatio(),
|
||||||
|
0.00001);
|
||||||
|
|
||||||
|
// Check if the off peak compaction ratio gets updated.
|
||||||
|
double newOffPeakCompactionRatio =
|
||||||
|
hstore.getStoreEngine().getCompactionPolicy().getConf().getCompactionRatioOffPeak() + 0.1;
|
||||||
|
conf.setFloat(strPrefix + "ratio.offpeak",
|
||||||
|
(float)newOffPeakCompactionRatio);
|
||||||
|
rs1.getConfigurationManager().notifyAllObservers(conf);
|
||||||
|
assertEquals(newOffPeakCompactionRatio,
|
||||||
|
hstore.getStoreEngine().getCompactionPolicy().getConf().getCompactionRatioOffPeak(),
|
||||||
|
0.00001);
|
||||||
|
|
||||||
|
// Check if the throttle point gets updated.
|
||||||
|
long newThrottlePoint =
|
||||||
|
hstore.getStoreEngine().getCompactionPolicy().getConf().getThrottlePoint() + 10;
|
||||||
|
conf.setLong("hbase.regionserver.thread.compaction.throttle",
|
||||||
|
newThrottlePoint);
|
||||||
|
rs1.getConfigurationManager().notifyAllObservers(conf);
|
||||||
|
assertEquals(newThrottlePoint,
|
||||||
|
hstore.getStoreEngine().getCompactionPolicy().getConf().getThrottlePoint());
|
||||||
|
|
||||||
|
// Check if the minFilesToCompact gets updated.
|
||||||
|
int newMinFilesToCompact =
|
||||||
|
hstore.getStoreEngine().getCompactionPolicy().getConf().getMinFilesToCompact() + 1;
|
||||||
|
conf.setLong(strPrefix + "min", newMinFilesToCompact);
|
||||||
|
rs1.getConfigurationManager().notifyAllObservers(conf);
|
||||||
|
assertEquals(newMinFilesToCompact,
|
||||||
|
hstore.getStoreEngine().getCompactionPolicy().getConf().getMinFilesToCompact());
|
||||||
|
|
||||||
|
// Check if the maxFilesToCompact gets updated.
|
||||||
|
int newMaxFilesToCompact =
|
||||||
|
hstore.getStoreEngine().getCompactionPolicy().getConf().getMaxFilesToCompact() + 1;
|
||||||
|
conf.setLong(strPrefix + "max", newMaxFilesToCompact);
|
||||||
|
rs1.getConfigurationManager().notifyAllObservers(conf);
|
||||||
|
assertEquals(newMaxFilesToCompact,
|
||||||
|
hstore.getStoreEngine().getCompactionPolicy().getConf().getMaxFilesToCompact());
|
||||||
|
|
||||||
|
// Check OffPeak hours is updated in an online fashion.
|
||||||
|
conf.setLong(CompactionConfiguration.HBASE_HSTORE_OFFPEAK_START_HOUR, 6);
|
||||||
|
conf.setLong(CompactionConfiguration.HBASE_HSTORE_OFFPEAK_END_HOUR, 7);
|
||||||
|
rs1.getConfigurationManager().notifyAllObservers(conf);
|
||||||
|
assertFalse(hstore.getOffPeakHours().isOffPeakHour(4));
|
||||||
|
|
||||||
|
// Check if the minCompactSize gets updated.
|
||||||
|
long newMinCompactSize =
|
||||||
|
hstore.getStoreEngine().getCompactionPolicy().getConf().getMinCompactSize() + 1;
|
||||||
|
conf.setLong(strPrefix + "min.size", newMinCompactSize);
|
||||||
|
rs1.getConfigurationManager().notifyAllObservers(conf);
|
||||||
|
assertEquals(newMinCompactSize,
|
||||||
|
hstore.getStoreEngine().getCompactionPolicy().getConf().getMinCompactSize());
|
||||||
|
|
||||||
|
// Check if the maxCompactSize gets updated.
|
||||||
|
long newMaxCompactSize =
|
||||||
|
hstore.getStoreEngine().getCompactionPolicy().getConf().getMaxCompactSize() - 1;
|
||||||
|
conf.setLong(strPrefix + "max.size", newMaxCompactSize);
|
||||||
|
rs1.getConfigurationManager().notifyAllObservers(conf);
|
||||||
|
assertEquals(newMaxCompactSize,
|
||||||
|
hstore.getStoreEngine().getCompactionPolicy().getConf().getMaxCompactSize());
|
||||||
|
|
||||||
|
// Check if majorCompactionPeriod gets updated.
|
||||||
|
long newMajorCompactionPeriod =
|
||||||
|
hstore.getStoreEngine().getCompactionPolicy().getConf().getMajorCompactionPeriod() + 10;
|
||||||
|
conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, newMajorCompactionPeriod);
|
||||||
|
rs1.getConfigurationManager().notifyAllObservers(conf);
|
||||||
|
assertEquals(newMajorCompactionPeriod,
|
||||||
|
hstore.getStoreEngine().getCompactionPolicy().getConf().getMajorCompactionPeriod());
|
||||||
|
|
||||||
|
// Check if majorCompactionJitter gets updated.
|
||||||
|
float newMajorCompactionJitter =
|
||||||
|
hstore.getStoreEngine().getCompactionPolicy().getConf().getMajorCompactionJitter() + 0.02F;
|
||||||
|
conf.setFloat("hbase.hregion.majorcompaction.jitter",
|
||||||
|
newMajorCompactionJitter);
|
||||||
|
rs1.getConfigurationManager().notifyAllObservers(conf);
|
||||||
|
assertEquals(newMajorCompactionJitter,
|
||||||
|
hstore.getStoreEngine().getCompactionPolicy().getConf().getMajorCompactionJitter(), 0.00001);
|
||||||
|
}
|
||||||
|
}
|
|
@ -61,16 +61,16 @@ public class TestOffPeakHours {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSetPeakHourToTargetTime() {
|
public void testSetPeakHourToTargetTime() {
|
||||||
conf.setLong("hbase.offpeak.start.hour", hourMinusOne);
|
conf.setLong(CompactionConfiguration.HBASE_HSTORE_OFFPEAK_START_HOUR, hourMinusOne);
|
||||||
conf.setLong("hbase.offpeak.end.hour", hourPlusOne);
|
conf.setLong(CompactionConfiguration.HBASE_HSTORE_OFFPEAK_END_HOUR, hourPlusOne);
|
||||||
OffPeakHours target = OffPeakHours.getInstance(conf);
|
OffPeakHours target = OffPeakHours.getInstance(conf);
|
||||||
assertTrue(target.isOffPeakHour(hourOfDay));
|
assertTrue(target.isOffPeakHour(hourOfDay));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSetPeakHourOutsideCurrentSelection() {
|
public void testSetPeakHourOutsideCurrentSelection() {
|
||||||
conf.setLong("hbase.offpeak.start.hour", hourMinusTwo);
|
conf.setLong(CompactionConfiguration.HBASE_HSTORE_OFFPEAK_START_HOUR, hourMinusTwo);
|
||||||
conf.setLong("hbase.offpeak.end.hour", hourMinusOne);
|
conf.setLong(CompactionConfiguration.HBASE_HSTORE_OFFPEAK_END_HOUR, hourMinusOne);
|
||||||
OffPeakHours target = OffPeakHours.getInstance(conf);
|
OffPeakHours target = OffPeakHours.getInstance(conf);
|
||||||
assertFalse(target.isOffPeakHour(hourOfDay));
|
assertFalse(target.isOffPeakHour(hourOfDay));
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue