HBASE-8329 Limit compaction speed

Signed-off-by: stack <stack@apache.org>
This commit is contained in:
zhangduo 2015-02-03 12:10:54 +08:00 committed by stack
parent da30c72b73
commit eb351b9ff8
33 changed files with 1111 additions and 159 deletions

View File

@ -35,17 +35,20 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.StringUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
/**
@ -81,6 +84,8 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
private final ThreadPoolExecutor splits;
private final ThreadPoolExecutor mergePool;
private volatile CompactionThroughputController compactionThroughputController;
/**
* Splitting should not take place if the total number of regions exceed this.
* This is not a hard limit to the number of regions but it is a guideline to
@ -151,6 +156,10 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
return t;
}
});
// compaction throughput controller
this.compactionThroughputController =
CompactionThroughputControllerFactory.create(server, conf);
}
@Override
@ -168,31 +177,31 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
queueLists.append(" LargeCompation Queue:\n");
BlockingQueue<Runnable> lq = longCompactions.getQueue();
Iterator<Runnable> it = lq.iterator();
while(it.hasNext()){
queueLists.append(" "+it.next().toString());
while (it.hasNext()) {
queueLists.append(" " + it.next().toString());
queueLists.append("\n");
}
if( shortCompactions != null ){
if (shortCompactions != null) {
queueLists.append("\n");
queueLists.append(" SmallCompation Queue:\n");
lq = shortCompactions.getQueue();
it = lq.iterator();
while(it.hasNext()){
queueLists.append(" "+it.next().toString());
while (it.hasNext()) {
queueLists.append(" " + it.next().toString());
queueLists.append("\n");
}
}
queueLists.append("\n");
queueLists.append(" Split Queue:\n");
lq = splits.getQueue();
it = lq.iterator();
while(it.hasNext()){
queueLists.append(" "+it.next().toString());
while (it.hasNext()) {
queueLists.append(" " + it.next().toString());
queueLists.append("\n");
}
queueLists.append("\n");
queueLists.append(" Region Merge Queue:\n");
lq = mergePool.getQueue();
@ -500,7 +509,8 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
// Note: please don't put single-compaction logic here;
// put it into region/store/etc. This is CST logic.
long start = EnvironmentEdgeManager.currentTime();
boolean completed = region.compact(compaction, store);
boolean completed =
region.compact(compaction, store, compactionThroughputController);
long now = EnvironmentEdgeManager.currentTime();
LOG.info(((completed) ? "Completed" : "Aborted") + " compaction: " +
this + "; duration=" + StringUtils.formatTimeDiff(now, start));
@ -615,6 +625,13 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
this.mergePool.setCorePoolSize(smallThreads);
}
CompactionThroughputController old = this.compactionThroughputController;
if (old != null) {
old.stop("configuration change");
}
this.compactionThroughputController =
CompactionThroughputControllerFactory.create(server, newConf);
// We change this atomically here instead of reloading the config in order that upstream
// would be the only one with the flexibility to reload the config.
this.conf.reloadConfiguration();
@ -643,4 +660,10 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
public void deregisterChildren(ConfigurationManager manager) {
// No children to register
}
@VisibleForTesting
public CompactionThroughputController getCompactionThroughputController() {
return compactionThroughputController;
}
}

View File

@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
import org.apache.hadoop.hbase.mapreduce.JobUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
@ -162,7 +163,8 @@ public class CompactionTool extends Configured implements Tool {
do {
CompactionContext compaction = store.requestCompaction(Store.PRIORITY_USER, null);
if (compaction == null) break;
List<StoreFile> storeFiles = store.compact(compaction);
List<StoreFile> storeFiles =
store.compact(compaction, NoLimitCompactionThroughputController.INSTANCE);
if (storeFiles != null && !storeFiles.isEmpty()) {
if (keepCompactedFiles && deleteCompacted) {
for (StoreFile storeFile: storeFiles) {

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.ExploringCompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
import org.apache.hadoop.hbase.util.ReflectionUtils;
/**
@ -63,7 +64,6 @@ public class DefaultStoreEngine extends StoreEngine<
@Override
protected void createComponents(
Configuration conf, Store store, KVComparator kvComparator) throws IOException {
storeFileManager = new DefaultStoreFileManager(kvComparator, conf);
String className = conf.get(DEFAULT_COMPACTOR_CLASS_KEY, DEFAULT_COMPACTOR_CLASS.getName());
try {
compactor = ReflectionUtils.instantiateWithCustomCtor(className,
@ -80,6 +80,7 @@ public class DefaultStoreEngine extends StoreEngine<
} catch (Exception e) {
throw new IOException("Unable to load configured compaction policy '" + className + "'", e);
}
storeFileManager = new DefaultStoreFileManager(kvComparator, conf, compactionPolicy.getConf());
className = conf.get(
DEFAULT_STORE_FLUSHER_CLASS_KEY, DEFAULT_STORE_FLUSHER_CLASS.getName());
try {
@ -106,8 +107,9 @@ public class DefaultStoreEngine extends StoreEngine<
}
@Override
public List<Path> compact() throws IOException {
return compactor.compact(request);
public List<Path> compact(CompactionThroughputController throughputController)
throws IOException {
return compactor.compact(request, throughputController);
}
@Override

View File

@ -27,11 +27,12 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
import com.google.common.collect.ImmutableCollection;
import com.google.common.collect.ImmutableList;
@ -45,7 +46,8 @@ class DefaultStoreFileManager implements StoreFileManager {
static final Log LOG = LogFactory.getLog(DefaultStoreFileManager.class);
private final KVComparator kvComparator;
private final Configuration conf;
private final CompactionConfiguration comConf;
private final int blockingFileCount;
/**
* List of store files inside this store. This is an immutable list that
@ -53,9 +55,12 @@ class DefaultStoreFileManager implements StoreFileManager {
*/
private volatile ImmutableList<StoreFile> storefiles = null;
public DefaultStoreFileManager(KVComparator kvComparator, Configuration conf) {
public DefaultStoreFileManager(KVComparator kvComparator, Configuration conf,
CompactionConfiguration comConf) {
this.kvComparator = kvComparator;
this.conf = conf;
this.comConf = comConf;
this.blockingFileCount =
conf.getInt(HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT);
}
@Override
@ -130,8 +135,6 @@ class DefaultStoreFileManager implements StoreFileManager {
@Override
public int getStoreCompactionPriority() {
int blockingFileCount = conf.getInt(
HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT);
int priority = blockingFileCount - storefiles.size();
return (priority == HStore.PRIORITY_USER) ? priority + 1 : priority;
}
@ -162,5 +165,14 @@ class DefaultStoreFileManager implements StoreFileManager {
storefiles = ImmutableList.copyOf(storeFiles);
}
@Override
public double getCompactionPressure() {
int storefileCount = getStorefileCount();
int minFilesToCompact = comConf.getMinFilesToCompact();
if (storefileCount <= minFilesToCompact) {
return 0.0;
}
return (double) (storefileCount - minFilesToCompact) / (blockingFileCount - minFilesToCompact);
}
}

View File

@ -134,6 +134,8 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.Flus
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
@ -1581,12 +1583,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
for (Store s : getStores().values()) {
CompactionContext compaction = s.requestCompaction();
if (compaction != null) {
compact(compaction, s);
compact(compaction, s, NoLimitCompactionThroughputController.INSTANCE);
}
}
}
/*
/**
* Called by compaction thread and after region is opened to compact the
* HStores if necessary.
*
@ -1597,11 +1599,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* conflicts with a region split, and that cannot happen because the region
* server does them sequentially and not in parallel.
*
* @param cr Compaction details, obtained by requestCompaction()
* @param compaction Compaction details, obtained by requestCompaction()
* @return whether the compaction completed
* @throws IOException e
*/
public boolean compact(CompactionContext compaction, Store store) throws IOException {
public boolean compact(CompactionContext compaction, Store store,
CompactionThroughputController throughputController) throws IOException {
assert compaction != null && compaction.hasSelection();
assert !compaction.getRequest().getFiles().isEmpty();
if (this.closing.get() || this.closed.get()) {
@ -1650,7 +1652,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// We no longer need to cancel the request on the way out of this
// method because Store#compact will clean up unconditionally
requestNeedsCancellation = false;
store.compact(compaction);
store.compact(compaction, throughputController);
} catch (InterruptedIOException iioe) {
String msg = "compaction interrupted";
LOG.info(msg, iioe);

View File

@ -3146,4 +3146,18 @@ public class HRegionServer extends HasThread implements
public HeapMemoryManager getHeapMemoryManager() {
return hMemManager;
}
@Override
public double getCompactionPressure() {
double max = 0;
for (HRegion region : onlineRegions.values()) {
for (Store store : region.getStores().values()) {
double normCount = store.getCompactionPressure();
if (normCount > max) {
max = normCount;
}
}
}
return max;
}
}

View File

@ -80,6 +80,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.security.EncryptionUtil;
import org.apache.hadoop.hbase.security.User;
@ -91,6 +92,7 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@ -931,7 +933,7 @@ public class HStore implements Store {
if (LOG.isInfoEnabled()) {
LOG.info("Added " + sf + ", entries=" + r.getEntries() +
", sequenceid=" + logCacheFlushId +
", filesize=" + StringUtils.humanReadableInt(r.length()));
", filesize=" + TraditionalBinaryPrefix.long2String(r.length(), "", 1));
}
return sf;
}
@ -1140,7 +1142,8 @@ public class HStore implements Store {
* @return Storefile we compacted into or null if we failed or opted out early.
*/
@Override
public List<StoreFile> compact(CompactionContext compaction) throws IOException {
public List<StoreFile> compact(CompactionContext compaction,
CompactionThroughputController throughputController) throws IOException {
assert compaction != null;
List<StoreFile> sfs = null;
CompactionRequest cr = compaction.getRequest();;
@ -1162,10 +1165,10 @@ public class HStore implements Store {
LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
+ this + " of " + this.getRegionInfo().getRegionNameAsString()
+ " into tmpdir=" + fs.getTempDir() + ", totalSize="
+ StringUtils.humanReadableInt(cr.getSize()));
+ TraditionalBinaryPrefix.long2String(cr.getSize(), "", 1));
// Commence the compaction.
List<Path> newFiles = compaction.compact();
List<Path> newFiles = compaction.compact(throughputController);
// TODO: get rid of this!
if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
@ -1277,12 +1280,12 @@ public class HStore implements Store {
for (StoreFile sf: sfs) {
message.append(sf.getPath().getName());
message.append("(size=");
message.append(StringUtils.humanReadableInt(sf.getReader().length()));
message.append(TraditionalBinaryPrefix.long2String(sf.getReader().length(), "", 1));
message.append("), ");
}
}
message.append("total size for store is ")
.append(StringUtils.humanReadableInt(storeSize))
.append(StringUtils.TraditionalBinaryPrefix.long2String(storeSize, "", 1))
.append(". This selection was in queue for ")
.append(StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime()))
.append(", and took ").append(StringUtils.formatTimeDiff(now, compactionStartTime))
@ -1342,7 +1345,7 @@ public class HStore implements Store {
}
}
this.replaceStoreFiles(inputStoreFiles, Collections.EMPTY_LIST);
this.replaceStoreFiles(inputStoreFiles, Collections.<StoreFile>emptyList());
this.completeCompaction(inputStoreFiles);
}
@ -1558,7 +1561,7 @@ public class HStore implements Store {
completeCompaction(delSfs);
LOG.info("Completed removal of " + delSfs.size() + " unnecessary (expired) file(s) in "
+ this + " of " + this.getRegionInfo().getRegionNameAsString()
+ "; total size for store is " + StringUtils.humanReadableInt(storeSize));
+ "; total size for store is " + TraditionalBinaryPrefix.long2String(storeSize, "", 1));
}
@Override
@ -1953,7 +1956,6 @@ public class HStore implements Store {
}
@Override
// TODO: why is there this and also getNumberOfStorefiles?! Remove one.
public int getStorefilesCount() {
return this.storeEngine.getStoreFileManager().getStorefileCount();
}
@ -2257,7 +2259,8 @@ public class HStore implements Store {
* Returns the StoreEngine that is backing this concrete implementation of Store.
* @return Returns the {@link StoreEngine} object used internally inside this HStore object.
*/
protected StoreEngine<?, ?, ?, ?> getStoreEngine() {
@VisibleForTesting
public StoreEngine<?, ?, ?, ?> getStoreEngine() {
return this.storeEngine;
}
@ -2292,4 +2295,9 @@ public class HStore implements Store {
public void deregisterChildren(ConfigurationManager manager) {
// No children to deregister
}
@Override
public double getCompactionPressure() {
return storeEngine.getStoreFileManager().getCompactionPressure();
}
}

View File

@ -153,4 +153,12 @@ public interface RegionServerServices
* @return heap memory manager instance
*/
HeapMemoryManager getHeapMemoryManager();
/**
* @return the max compaction pressure of all stores on this regionserver. The value should be
* greater than or equal to 0.0, and any value greater than 1.0 means we enter the
* emergency state that some stores have too many store files.
* @see org.apache.hadoop.hbase.regionserver.Store#getCompactionPressure()
*/
double getCompactionPressure();
}

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
import org.apache.hadoop.hbase.util.Pair;
/**
@ -188,7 +189,8 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
void cancelRequestedCompaction(CompactionContext compaction);
List<StoreFile> compact(CompactionContext compaction) throws IOException;
List<StoreFile> compact(CompactionContext compaction,
CompactionThroughputController throughputController) throws IOException;
/**
* @return true if we should run a major compaction.
@ -396,5 +398,22 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
* the primary region files.
* @throws IOException
*/
void refreshStoreFiles() throws IOException;
void refreshStoreFiles() throws IOException;
/**
* This value can represent the degree of emergency of compaction for this store. It should be
* greater than or equal to 0.0, any value greater than 1.0 means we have too many store files.
* <ul>
* <li>if getStorefilesCount &lt;= getMinFilesToCompact, return 0.0</li>
* <li>return (getStorefilesCount - getMinFilesToCompact) / (blockingFileCount -
* getMinFilesToCompact)</li>
* </ul>
* <p>
* And for striped stores, we should calculate this value by the files in each stripe separately
* and return the maximum value.
* <p>
* It is similar to {@link #getCompactPriority()} except that it is more suitable to use in a
* linear formula.
*/
double getCompactionPressure();
}

View File

@ -134,4 +134,10 @@ public interface StoreFileManager {
* @return The files which don't have any necessary data according to TTL and other criteria.
*/
Collection<StoreFile> getUnneededFiles(long maxTs, List<StoreFile> filesCompacting);
/**
* @return the compaction pressure used for compaction throughput tuning.
* @see Store#getCompactionPressure()
*/
double getCompactionPressure();
}

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
import com.google.common.base.Preconditions;
@ -98,9 +99,10 @@ public class StripeStoreEngine extends StoreEngine<StripeStoreFlusher,
}
@Override
public List<Path> compact() throws IOException {
public List<Path> compact(CompactionThroughputController throughputController)
throws IOException {
Preconditions.checkArgument(this.stripeRequest != null, "Cannot compact without selection");
return this.stripeRequest.execute(compactor);
return this.stripeRequest.execute(compactor, throughputController);
}
}
}

View File

@ -31,16 +31,16 @@ import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ConcatenatedLists;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
import com.google.common.collect.ImmutableCollection;
import com.google.common.collect.ImmutableList;
@ -471,20 +471,23 @@ public class StripeStoreFileManager
if (!LOG.isDebugEnabled()) return;
StringBuilder sb = new StringBuilder();
sb.append("\n" + string + "; current stripe state is as such:");
sb.append("\n level 0 with ").append(state.level0Files.size())
sb.append("\n level 0 with ")
.append(state.level0Files.size())
.append(
" files: "
+ StringUtils.humanReadableInt(StripeCompactionPolicy
.getTotalFileSize(state.level0Files)) + ";");
+ TraditionalBinaryPrefix.long2String(
StripeCompactionPolicy.getTotalFileSize(state.level0Files), "", 1) + ";");
for (int i = 0; i < state.stripeFiles.size(); ++i) {
String endRow = (i == state.stripeEndRows.length)
? "(end)" : "[" + Bytes.toString(state.stripeEndRows[i]) + "]";
sb.append("\n stripe ending in ").append(endRow).append(" with ")
sb.append("\n stripe ending in ")
.append(endRow)
.append(" with ")
.append(state.stripeFiles.get(i).size())
.append(
" files: "
+ StringUtils.humanReadableInt(StripeCompactionPolicy
.getTotalFileSize(state.stripeFiles.get(i))) + ";");
+ TraditionalBinaryPrefix.long2String(
StripeCompactionPolicy.getTotalFileSize(state.stripeFiles.get(i)), "", 1) + ";");
}
sb.append("\n").append(state.stripeFiles.size()).append(" stripes total.");
sb.append("\n").append(getStorefileCount()).append(" files total.");
@ -955,4 +958,36 @@ public class StripeStoreFileManager
}
return expiredStoreFiles;
}
@Override
public double getCompactionPressure() {
State stateLocal = this.state;
if (stateLocal.allFilesCached.size() > blockingFileCount) {
// just a hit to tell others that we have reached the blocking file count.
return 2.0;
}
if (stateLocal.stripeFiles.isEmpty()) {
return 0.0;
}
int blockingFilePerStripe = blockingFileCount / stateLocal.stripeFiles.size();
// do not calculate L0 separately because data will be moved to stripe quickly and in most cases
// we flush data to stripe directly.
int delta = stateLocal.level0Files.isEmpty() ? 0 : 1;
double max = 0.0;
for (ImmutableList<StoreFile> stripeFile : stateLocal.stripeFiles) {
int stripeFileCount = stripeFile.size();
double normCount =
(double) (stripeFileCount + delta - config.getStripeCompactMinFiles())
/ (blockingFilePerStripe - config.getStripeCompactMinFiles());
if (normCount >= 1.0) {
// This could happen if stripe is not split evenly. Do not return values that larger than
// 1.0 because we have not reached the blocking file count actually.
return 1.0;
}
if (normCount > max) {
max = normCount;
}
}
return max;
}
}

View File

@ -68,7 +68,8 @@ public abstract class CompactionContext {
* Runs the compaction based on current selection. select/forceSelect must have been called.
* @return The new file paths resulting from compaction.
*/
public abstract List<Path> compact() throws IOException;
public abstract List<Path> compact(CompactionThroughputController throughputController)
throws IOException;
public CompactionRequest getRequest() {
assert hasSelection();

View File

@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.compactions;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
/**
* A utility that constrains the total throughput of one or more simultaneous flows (compactions) by
* sleeping when necessary.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public interface CompactionThroughputController extends Stoppable {
/**
* Setup controller for the given region server.
*/
void setup(RegionServerServices server);
/**
* Start a compaction.
*/
void start(String compactionName);
/**
* Control the compaction throughput. Will sleep if too fast.
* @return the actual sleep time.
*/
long control(String compactionName, long size) throws InterruptedException;
/**
* Finish a compaction. Should call this method in a finally block.
*/
void finish(String compactionName);
}

View File

@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.compactions;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.util.ReflectionUtils;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class CompactionThroughputControllerFactory {
private static final Log LOG = LogFactory.getLog(CompactionThroughputControllerFactory.class);
public static final String HBASE_THROUGHPUT_CONTROLLER_KEY =
"hbase.regionserver.throughput.controller";
private static final Class<? extends CompactionThroughputController>
DEFAULT_THROUGHPUT_CONTROLLER_CLASS = PressureAwareCompactionThroughputController.class;
public static CompactionThroughputController create(RegionServerServices server,
Configuration conf) {
Class<? extends CompactionThroughputController> clazz = getThroughputControllerClass(conf);
CompactionThroughputController controller = ReflectionUtils.newInstance(clazz, conf);
controller.setup(server);
return controller;
}
public static Class<? extends CompactionThroughputController> getThroughputControllerClass(
Configuration conf) {
String className =
conf.get(HBASE_THROUGHPUT_CONTROLLER_KEY, DEFAULT_THROUGHPUT_CONTROLLER_CLASS.getName());
try {
return Class.forName(className).asSubclass(CompactionThroughputController.class);
} catch (Exception e) {
LOG.warn(
"Unable to load configured throughput controller '" + className
+ "', load default throughput controller "
+ DEFAULT_THROUGHPUT_CONTROLLER_CLASS.getName() + " instead", e);
return DEFAULT_THROUGHPUT_CONTROLLER_CLASS;
}
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.regionserver.compactions;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@ -25,12 +26,12 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
@ -44,7 +45,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
/**
* A compactor is a compaction algorithm associated a given policy. Base class also contains
@ -167,7 +168,7 @@ public abstract class Compactor {
LOG.debug("Compacting " + file +
", keycount=" + keyCount +
", bloomtype=" + r.getBloomFilterType().toString() +
", size=" + StringUtils.humanReadableInt(r.length()) +
", size=" + TraditionalBinaryPrefix.long2String(r.length(), "", 1) +
", encoding=" + r.getHFileReader().getDataBlockEncoding() +
", seqNum=" + seqNum +
(allFiles ? ", earliestPutTs=" + earliestPutTs: ""));
@ -227,8 +228,9 @@ public abstract class Compactor {
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint
* @return Whether compaction ended; false if it was interrupted for some reason.
*/
protected boolean performCompaction(InternalScanner scanner,
CellSink writer, long smallestReadPoint, boolean cleanSeqId) throws IOException {
protected boolean performCompaction(InternalScanner scanner, CellSink writer,
long smallestReadPoint, boolean cleanSeqId,
CompactionThroughputController throughputController) throws IOException {
long bytesWritten = 0;
long bytesWrittenProgress = 0;
// Since scanner.next() can return 'false' but still be delivering data,
@ -239,49 +241,66 @@ public abstract class Compactor {
if (LOG.isDebugEnabled()) {
lastMillis = EnvironmentEdgeManager.currentTime();
}
String compactionName =
store.getRegionInfo().getRegionNameAsString() + "#" + store.getFamily().getNameAsString();
long now = 0;
boolean hasMore;
do {
hasMore = scanner.next(cells, compactionKVMax);
if (LOG.isDebugEnabled()) {
now = EnvironmentEdgeManager.currentTime();
}
// output to writer:
for (Cell c : cells) {
if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) {
CellUtil.setSequenceId(c, 0);
}
writer.append(c);
int len = KeyValueUtil.length(c);
++progress.currentCompactedKVs;
progress.totalCompactedSize += len;
throughputController.start(compactionName);
try {
do {
hasMore = scanner.next(cells, compactionKVMax);
if (LOG.isDebugEnabled()) {
bytesWrittenProgress += len;
now = EnvironmentEdgeManager.currentTime();
}
// check periodically to see if a system stop is requested
if (closeCheckInterval > 0) {
bytesWritten += len;
if (bytesWritten > closeCheckInterval) {
bytesWritten = 0;
if (!store.areWritesEnabled()) {
progress.cancel();
return false;
// output to writer:
for (Cell c : cells) {
if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) {
CellUtil.setSequenceId(c, 0);
}
writer.append(c);
int len = KeyValueUtil.length(c);
++progress.currentCompactedKVs;
progress.totalCompactedSize += len;
if (LOG.isDebugEnabled()) {
bytesWrittenProgress += len;
}
throughputController.control(compactionName, len);
// check periodically to see if a system stop is requested
if (closeCheckInterval > 0) {
bytesWritten += len;
if (bytesWritten > closeCheckInterval) {
bytesWritten = 0;
if (!store.areWritesEnabled()) {
progress.cancel();
return false;
}
}
}
}
}
// Log the progress of long running compactions every minute if
// logging at DEBUG level
if (LOG.isDebugEnabled()) {
if ((now - lastMillis) >= 60 * 1000) {
LOG.debug("Compaction progress: " + progress + String.format(", rate=%.2f kB/sec",
(bytesWrittenProgress / 1024.0) / ((now - lastMillis) / 1000.0)));
lastMillis = now;
bytesWrittenProgress = 0;
// Log the progress of long running compactions every minute if
// logging at DEBUG level
if (LOG.isDebugEnabled()) {
if ((now - lastMillis) >= 60 * 1000) {
LOG.debug("Compaction progress: "
+ compactionName
+ " "
+ progress
+ String.format(", rate=%.2f kB/sec", (bytesWrittenProgress / 1024.0)
/ ((now - lastMillis) / 1000.0)) + ", throughputController is "
+ throughputController);
lastMillis = now;
bytesWrittenProgress = 0;
}
}
}
cells.clear();
} while (hasMore);
cells.clear();
} while (hasMore);
} catch (InterruptedException e) {
progress.cancel();
throw new InterruptedIOException("Interrupted while control throughput of compacting "
+ compactionName);
} finally {
throughputController.finish(compactionName);
}
progress.complete();
return true;
}

View File

@ -36,7 +36,8 @@ import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
/**
* Compact passed set of files. Create an instance and then call {@link #compact(CompactionRequest)}
* Compact passed set of files. Create an instance and then call
* {@link #compact(CompactionRequest, CompactionThroughputController)}
*/
@InterfaceAudience.Private
public class DefaultCompactor extends Compactor {
@ -49,7 +50,8 @@ public class DefaultCompactor extends Compactor {
/**
* Do a minor/major compaction on an explicit set of storefiles from a Store.
*/
public List<Path> compact(final CompactionRequest request) throws IOException {
public List<Path> compact(final CompactionRequest request,
CompactionThroughputController throughputController) throws IOException {
FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles());
this.progress = new CompactionProgress(fd.maxKeyCount);
@ -99,7 +101,8 @@ public class DefaultCompactor extends Compactor {
writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true,
true, fd.maxTagsLength > 0);
boolean finished = performCompaction(scanner, writer, smallestReadPoint, cleanSeqId);
boolean finished =
performCompaction(scanner, writer, smallestReadPoint, cleanSeqId, throughputController);
if (!finished) {
writer.close();
store.getFileSystem().delete(writer.getPath(), false);
@ -144,7 +147,7 @@ public class DefaultCompactor extends Compactor {
/**
* Compact a list of files for testing. Creates a fake {@link CompactionRequest} to pass to
* {@link #compact(CompactionRequest)};
* {@link #compact(CompactionRequest, CompactionThroughputController)};
* @param filesToCompact the files to compact. These are used as the compactionSelection for
* the generated {@link CompactionRequest}.
* @param isMajor true to major compact (prune all deletes, max versions, etc)
@ -156,6 +159,6 @@ public class DefaultCompactor extends Compactor {
throws IOException {
CompactionRequest cr = new CompactionRequest(filesToCompact);
cr.setIsMajor(isMajor, isMajor);
return this.compact(cr);
return this.compact(cr, NoLimitCompactionThroughputController.INSTANCE);
}
}

View File

@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.compactions;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
/**
* A dummy CompactionThroughputController that does nothing.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class NoLimitCompactionThroughputController implements CompactionThroughputController {
public static final NoLimitCompactionThroughputController INSTANCE =
new NoLimitCompactionThroughputController();
@Override
public void setup(RegionServerServices server) {
}
@Override
public void start(String compactionName) {
}
@Override
public long control(String compactionName, long size) throws InterruptedException {
return 0;
}
@Override
public void finish(String compactionName) {
}
private volatile boolean stopped;
@Override
public void stop(String why) {
stopped = true;
}
@Override
public boolean isStopped() {
return stopped;
}
@Override
public String toString() {
return "NoLimitCompactionThroughputController";
}
}

View File

@ -0,0 +1,263 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.compactions;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
* A throughput controller which uses the follow schema to limit throughput
* <ul>
* <li>If compaction pressure is greater than 1.0, no limitation.</li>
* <li>In off peak hours, use a fixed throughput limitation
* {@value #HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK}</li>
* <li>In normal hours, the max throughput is tune between
* {@value #HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND} and
* {@value #HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND}, using the formula &quot;lower +
* (higer - lower) * compactionPressure&quot;, where compactionPressure is in range [0.0, 1.0]</li>
* </ul>
* @see org.apache.hadoop.hbase.regionserver.Store#getCompactionPressure()
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class PressureAwareCompactionThroughputController extends Configured implements
CompactionThroughputController, Stoppable {
private final static Log LOG = LogFactory
.getLog(PressureAwareCompactionThroughputController.class);
public static final String HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND =
"hbase.hstore.compaction.throughput.higher.bound";
private static final long DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND =
20L * 1024 * 1024;
public static final String HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND =
"hbase.hstore.compaction.throughput.lower.bound";
private static final long DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND =
10L * 1024 * 1024;
public static final String HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK =
"hbase.hstore.compaction.throughput.offpeak";
private static final long DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK = Long.MAX_VALUE;
public static final String HBASE_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD =
"hbase.hstore.compaction.throughput.tune.period";
private static final int DEFAULT_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD = 60 * 1000;
/**
* Stores the information of one controlled compaction.
*/
private static final class ActiveCompaction {
private final long startTime;
private long lastControlTime;
private long lastControlSize;
private long totalSize;
private long numberOfSleeps;
private long totalSleepTime;
// prevent too many debug log
private long lastLogTime;
ActiveCompaction() {
long currentTime = EnvironmentEdgeManager.currentTime();
this.startTime = currentTime;
this.lastControlTime = currentTime;
this.lastLogTime = currentTime;
}
}
private long maxThroughputHigherBound;
private long maxThroughputLowerBound;
private long maxThroughputOffpeak;
private OffPeakHours offPeakHours;
private long controlPerSize;
private int tuningPeriod;
volatile double maxThroughput;
private final ConcurrentMap<String, ActiveCompaction> activeCompactions =
new ConcurrentHashMap<String, ActiveCompaction>();
@Override
public void setup(final RegionServerServices server) {
server.getChoreService().scheduleChore(
new ScheduledChore("CompactionThroughputTuner", this, tuningPeriod) {
@Override
protected void chore() {
tune(server.getCompactionPressure());
}
});
}
private void tune(double compactionPressure) {
double maxThroughputToSet;
if (compactionPressure > 1.0) {
// set to unlimited if some stores already reach the blocking store file count
maxThroughputToSet = Double.MAX_VALUE;
} else if (offPeakHours.isOffPeakHour()) {
maxThroughputToSet = maxThroughputOffpeak;
} else {
// compactionPressure is between 0.0 and 1.0, we use a simple linear formula to
// calculate the throughput limitation.
maxThroughputToSet =
maxThroughputLowerBound + (maxThroughputHigherBound - maxThroughputLowerBound)
* compactionPressure;
}
if (LOG.isDebugEnabled()) {
LOG.debug("compactionPressure is " + compactionPressure + ", tune compaction throughput to "
+ throughputDesc(maxThroughputToSet));
}
this.maxThroughput = maxThroughputToSet;
}
@Override
public void setConf(Configuration conf) {
super.setConf(conf);
if (conf == null) {
return;
}
this.maxThroughputHigherBound =
conf.getLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND,
DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND);
this.maxThroughputLowerBound =
conf.getLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND,
DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND);
this.maxThroughputOffpeak =
conf.getLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK,
DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK);
this.offPeakHours = OffPeakHours.getInstance(conf);
this.controlPerSize = this.maxThroughputLowerBound;
this.maxThroughput = this.maxThroughputLowerBound;
this.tuningPeriod =
getConf().getInt(HBASE_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD,
DEFAULT_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD);
LOG.info("Compaction throughput configurations, higher bound: "
+ throughputDesc(maxThroughputHigherBound) + ", lower bound "
+ throughputDesc(maxThroughputLowerBound) + ", off peak: "
+ throughputDesc(maxThroughputOffpeak) + ", tuning period: " + tuningPeriod + " ms");
}
private String throughputDesc(long deltaSize, long elapsedTime) {
return throughputDesc((double) deltaSize / elapsedTime * 1000);
}
private String throughputDesc(double speed) {
if (speed >= 1E15) { // large enough to say it is unlimited
return "unlimited";
} else {
return String.format("%.2f MB/sec", speed / 1024 / 1024);
}
}
@Override
public void start(String compactionName) {
activeCompactions.put(compactionName, new ActiveCompaction());
}
@Override
public long control(String compactionName, long size) throws InterruptedException {
ActiveCompaction compaction = activeCompactions.get(compactionName);
compaction.totalSize += size;
long deltaSize = compaction.totalSize - compaction.lastControlSize;
if (deltaSize < controlPerSize) {
return 0;
}
long now = EnvironmentEdgeManager.currentTime();
double maxThroughputPerCompaction = this.maxThroughput / activeCompactions.size();
long minTimeAllowed = (long) (deltaSize / maxThroughputPerCompaction * 1000); // ms
long elapsedTime = now - compaction.lastControlTime;
compaction.lastControlSize = compaction.totalSize;
if (elapsedTime >= minTimeAllowed) {
compaction.lastControlTime = EnvironmentEdgeManager.currentTime();
return 0;
}
// too fast
long sleepTime = minTimeAllowed - elapsedTime;
if (LOG.isDebugEnabled()) {
// do not log too much
if (now - compaction.lastLogTime > 60L * 1000) {
LOG.debug(compactionName + " sleep " + sleepTime + " ms because current throughput is "
+ throughputDesc(deltaSize, elapsedTime) + ", max allowed is "
+ throughputDesc(maxThroughputPerCompaction) + ", already slept "
+ compaction.numberOfSleeps + " time(s) and total slept time is "
+ compaction.totalSleepTime + " ms till now.");
compaction.lastLogTime = now;
}
}
Thread.sleep(sleepTime);
compaction.numberOfSleeps++;
compaction.totalSleepTime += sleepTime;
compaction.lastControlTime = EnvironmentEdgeManager.currentTime();
return sleepTime;
}
@Override
public void finish(String compactionName) {
ActiveCompaction compaction = activeCompactions.remove(compactionName);
long elapsedTime = Math.max(1, EnvironmentEdgeManager.currentTime() - compaction.startTime);
LOG.info(compactionName + " average throughput is "
+ throughputDesc(compaction.totalSize, elapsedTime) + ", slept "
+ compaction.numberOfSleeps + " time(s) and total slept time is "
+ compaction.totalSleepTime + " ms. " + activeCompactions.size()
+ " active compactions remaining, total limit is " + throughputDesc(maxThroughput));
}
private volatile boolean stopped = false;
@Override
public void stop(String why) {
stopped = true;
}
@Override
public boolean isStopped() {
return stopped;
}
@Override
public String toString() {
return "DefaultCompactionThroughputController [maxThroughput=" + throughputDesc(maxThroughput)
+ ", activeCompactions=" + activeCompactions.size() + "]";
}
}

View File

@ -396,7 +396,8 @@ public class StripeCompactionPolicy extends CompactionPolicy {
* @param compactor Compactor.
* @return result of compact(...)
*/
public abstract List<Path> execute(StripeCompactor compactor) throws IOException;
public abstract List<Path> execute(StripeCompactor compactor,
CompactionThroughputController throughputController) throws IOException;
public StripeCompactionRequest(CompactionRequest request) {
this.request = request;
@ -447,9 +448,10 @@ public class StripeCompactionPolicy extends CompactionPolicy {
}
@Override
public List<Path> execute(StripeCompactor compactor) throws IOException {
return compactor.compact(
this.request, this.targetBoundaries, this.majorRangeFromRow, this.majorRangeToRow);
public List<Path> execute(StripeCompactor compactor,
CompactionThroughputController throughputController) throws IOException {
return compactor.compact(this.request, this.targetBoundaries, this.majorRangeFromRow,
this.majorRangeToRow, throughputController);
}
}
@ -497,9 +499,10 @@ public class StripeCompactionPolicy extends CompactionPolicy {
}
@Override
public List<Path> execute(StripeCompactor compactor) throws IOException {
return compactor.compact(this.request, this.targetCount, this.targetKvs,
this.startRow, this.endRow, this.majorRangeFromRow, this.majorRangeToRow);
public List<Path> execute(StripeCompactor compactor,
CompactionThroughputController throughputController) throws IOException {
return compactor.compact(this.request, this.targetCount, this.targetKvs, this.startRow,
this.endRow, this.majorRangeFromRow, this.majorRangeToRow, throughputController);
}
/** Set major range of the compaction to the entire compaction range.

View File

@ -51,7 +51,8 @@ public class StripeCompactor extends Compactor {
}
public List<Path> compact(CompactionRequest request, List<byte[]> targetBoundaries,
byte[] majorRangeFromRow, byte[] majorRangeToRow) throws IOException {
byte[] majorRangeFromRow, byte[] majorRangeToRow,
CompactionThroughputController throughputController) throws IOException {
if (LOG.isDebugEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append("Executing compaction with " + targetBoundaries.size() + " boundaries:");
@ -62,12 +63,13 @@ public class StripeCompactor extends Compactor {
}
StripeMultiFileWriter writer = new StripeMultiFileWriter.BoundaryMultiWriter(
targetBoundaries, majorRangeFromRow, majorRangeToRow);
return compactInternal(writer, request, majorRangeFromRow, majorRangeToRow);
return compactInternal(writer, request, majorRangeFromRow, majorRangeToRow,
throughputController);
}
public List<Path> compact(CompactionRequest request, int targetCount, long targetSize,
byte[] left, byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow)
throws IOException {
byte[] left, byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow,
CompactionThroughputController throughputController) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Executing compaction with " + targetSize
+ " target file size, no more than " + targetCount + " files, in ["
@ -75,11 +77,13 @@ public class StripeCompactor extends Compactor {
}
StripeMultiFileWriter writer = new StripeMultiFileWriter.SizeMultiWriter(
targetCount, targetSize, left, right);
return compactInternal(writer, request, majorRangeFromRow, majorRangeToRow);
return compactInternal(writer, request, majorRangeFromRow, majorRangeToRow,
throughputController);
}
private List<Path> compactInternal(StripeMultiFileWriter mw, CompactionRequest request,
byte[] majorRangeFromRow, byte[] majorRangeToRow) throws IOException {
byte[] majorRangeFromRow, byte[] majorRangeToRow,
CompactionThroughputController throughputController) throws IOException {
final Collection<StoreFile> filesToCompact = request.getFiles();
final FileDetails fd = getFileDetails(filesToCompact, request.isMajor());
this.progress = new CompactionProgress(fd.maxKeyCount);
@ -126,7 +130,8 @@ public class StripeCompactor extends Compactor {
// It is ok here if storeScanner is null.
StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null;
mw.init(storeScanner, factory, store.getComparator());
finished = performCompaction(scanner, mw, smallestReadPoint, cleanSeqId);
finished =
performCompaction(scanner, mw, smallestReadPoint, cleanSeqId, throughputController);
if (!finished) {
throw new InterruptedIOException( "Aborting compaction of store " + store +
" in region " + store.getRegionInfo().getRegionNameAsString() +

View File

@ -291,4 +291,9 @@ public class MockRegionServerServices implements RegionServerServices {
public HeapMemoryManager getHeapMemoryManager() {
return null;
}
@Override
public double getCompactionPressure() {
return 0;
}
}

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -117,14 +118,17 @@ public class TestIOFencing {
throw new IOException(ex);
}
}
@Override
public boolean compact(CompactionContext compaction, Store store) throws IOException {
public boolean compact(CompactionContext compaction, Store store,
CompactionThroughputController throughputController) throws IOException {
try {
return super.compact(compaction, store);
return super.compact(compaction, store, throughputController);
} finally {
compactCount++;
}
}
public int countStoreFiles() {
int count = 0;
for (Store store : stores.values()) {

View File

@ -407,7 +407,6 @@ public class TestHCM {
return step.get() == 3;
}
});
table.close();
connection.close();
Assert.assertTrue("Unexpected exception is " + failed.get(), failed.get() == null);

View File

@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -229,9 +230,11 @@ public class TestRegionObserverScannerOpenHook {
if (compactionStateChangeLatch == null) compactionStateChangeLatch = new CountDownLatch(1);
return compactionStateChangeLatch;
}
@Override
public boolean compact(CompactionContext compaction, Store store) throws IOException {
boolean ret = super.compact(compaction, store);
public boolean compact(CompactionContext compaction, Store store,
CompactionThroughputController throughputController) throws IOException {
boolean ret = super.compact(compaction, store, throughputController);
if (ret) compactionStateChangeLatch.countDown();
return ret;
}

View File

@ -626,4 +626,9 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
public HeapMemoryManager getHeapMemoryManager() {
return null;
}
@Override
public double getCompactionPressure() {
return 0;
}
}

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestCase;
import org.apache.hadoop.hbase.HBaseTestCase.HRegionIncommon;
@ -59,6 +60,9 @@ import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
@ -99,8 +103,10 @@ public class TestCompaction {
super();
// Set cache flush size to 1MB
conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024*1024);
conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024);
conf.setInt("hbase.hregion.memstore.block.multiplier", 100);
conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
NoLimitCompactionThroughputController.class.getName());
compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
secondRowBytes = START_KEY_BYTES.clone();
@ -359,7 +365,8 @@ public class TestCompaction {
}
@Override
public List<Path> compact() throws IOException {
public List<Path> compact(CompactionThroughputController throughputController)
throws IOException {
finishCompaction(this.selectedFiles);
return new ArrayList<Path>();
}
@ -410,12 +417,15 @@ public class TestCompaction {
}
@Override
public List<Path> compact() throws IOException {
public List<Path> compact(CompactionThroughputController throughputController)
throws IOException {
try {
isInCompact = true;
synchronized (this) { this.wait(); }
synchronized (this) {
this.wait();
}
} catch (InterruptedException e) {
Assume.assumeNoException(e);
Assume.assumeNoException(e);
}
return new ArrayList<Path>();
}
@ -481,14 +491,18 @@ public class TestCompaction {
HRegionServer mockServer = mock(HRegionServer.class);
when(mockServer.isStopped()).thenReturn(false);
when(mockServer.getConfiguration()).thenReturn(conf);
when(mockServer.getChoreService()).thenReturn(new ChoreService("test"));
CompactSplitThread cst = new CompactSplitThread(mockServer);
when(mockServer.getCompactSplitThread()).thenReturn(cst);
// Set up the region mock that redirects compactions.
HRegion r = mock(HRegion.class);
when(r.compact(any(CompactionContext.class), any(Store.class))).then(new Answer<Boolean>() {
when(
r.compact(any(CompactionContext.class), any(Store.class),
any(CompactionThroughputController.class))).then(new Answer<Boolean>() {
public Boolean answer(InvocationOnMock invocation) throws Throwable {
((CompactionContext)invocation.getArguments()[0]).compact();
invocation.getArgumentAt(0, CompactionContext.class).compact(
invocation.getArgumentAt(2, CompactionThroughputController.class));
return true;
}
});

View File

@ -84,6 +84,7 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Repor
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
@ -257,7 +258,7 @@ public class TestSplitTransactionOnCluster {
region.initialize();
// 2, Run Compaction cc
assertFalse(region.compact(cc, store));
assertFalse(region.compact(cc, store, NoLimitCompactionThroughputController.INSTANCE));
assertTrue(fileNum > store.getStorefiles().size());
// 3, Split

View File

@ -70,6 +70,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.security.User;
@ -374,7 +375,7 @@ public class TestStore {
Assert.assertEquals(lowestTimeStampFromManager,lowestTimeStampFromFS);
// after compact; check the lowest time stamp
store.compact(store.requestCompaction());
store.compact(store.requestCompaction(), NoLimitCompactionThroughputController.INSTANCE);
lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
Assert.assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
@ -121,7 +122,8 @@ public class TestStripeCompactor {
StoreFileWritersCapture writers = new StoreFileWritersCapture();
StripeCompactor sc = createCompactor(writers, input);
List<Path> paths =
sc.compact(createDummyRequest(), Arrays.asList(boundaries), majorFrom, majorTo);
sc.compact(createDummyRequest(), Arrays.asList(boundaries), majorFrom, majorTo,
NoLimitCompactionThroughputController.INSTANCE);
writers.verifyKvs(output, allFiles, true);
if (allFiles) {
assertEquals(output.length, paths.size());
@ -156,8 +158,9 @@ public class TestStripeCompactor {
byte[] left, byte[] right, KeyValue[][] output) throws Exception {
StoreFileWritersCapture writers = new StoreFileWritersCapture();
StripeCompactor sc = createCompactor(writers, input);
List<Path> paths = sc.compact(
createDummyRequest(), targetCount, targetSize, left, right, null, null);
List<Path> paths =
sc.compact(createDummyRequest(), targetCount, targetSize, left, right, null, null,
NoLimitCompactionThroughputController.INSTANCE);
assertEquals(output.length, paths.size());
writers.verifyKvs(output, true, true);
List<byte[]> boundaries = new ArrayList<byte[]>();

View File

@ -17,8 +17,16 @@
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Arrays;
@ -27,12 +35,14 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -63,9 +73,10 @@ public class TestStripeStoreEngine {
TestStoreEngine se = createEngine(conf);
StripeCompactor mockCompactor = mock(StripeCompactor.class);
se.setCompactorOverride(mockCompactor);
when(mockCompactor.compact(any(CompactionRequest.class), anyInt(), anyLong(),
any(byte[].class), any(byte[].class), any(byte[].class), any(byte[].class)))
.thenReturn(new ArrayList<Path>());
when(
mockCompactor.compact(any(CompactionRequest.class), anyInt(), anyLong(), any(byte[].class),
any(byte[].class), any(byte[].class), any(byte[].class),
any(CompactionThroughputController.class))).thenReturn(new ArrayList<Path>());
// Produce 3 L0 files.
StoreFile sf = createFile();
@ -83,9 +94,10 @@ public class TestStripeStoreEngine {
assertEquals(2, compaction.getRequest().getFiles().size());
assertFalse(compaction.getRequest().getFiles().contains(sf));
// Make sure the correct method it called on compactor.
compaction.compact();
compaction.compact(NoLimitCompactionThroughputController.INSTANCE);
verify(mockCompactor, times(1)).compact(compaction.getRequest(), targetCount, 0L,
StripeStoreFileManager.OPEN_KEY, StripeStoreFileManager.OPEN_KEY, null, null);
StripeStoreFileManager.OPEN_KEY, StripeStoreFileManager.OPEN_KEY, null, null,
NoLimitCompactionThroughputController.INSTANCE);
}
private static StoreFile createFile() throws Exception {

View File

@ -0,0 +1,294 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.compactions;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.List;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreEngine;
import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
import org.apache.hadoop.hbase.regionserver.StripeStoreEngine;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ RegionServerTests.class, MediumTests.class })
public class TestCompactionWithThroughputController {
private static final Log LOG = LogFactory.getLog(TestCompactionWithThroughputController.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final double EPSILON = 1E-6;
private final TableName tableName = TableName.valueOf(getClass().getSimpleName());
private final byte[] family = Bytes.toBytes("f");
private final byte[] qualifier = Bytes.toBytes("q");
private Store getStoreWithName(TableName tableName) {
MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
List<JVMClusterUtil.RegionServerThread> rsts = cluster.getRegionServerThreads();
for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) {
HRegionServer hrs = rsts.get(i).getRegionServer();
for (HRegion region : hrs.getOnlineRegions(tableName)) {
return region.getStores().values().iterator().next();
}
}
return null;
}
private Store prepareData() throws IOException {
HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
if (admin.tableExists(tableName)) {
admin.disableTable(tableName);
admin.deleteTable(tableName);
}
HTable table = TEST_UTIL.createTable(tableName, family);
Random rand = new Random();
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
byte[] value = new byte[128 * 1024];
rand.nextBytes(value);
table.put(new Put(Bytes.toBytes(i * 10 + j)).add(family, qualifier, value));
}
admin.flush(tableName);
}
return getStoreWithName(tableName);
}
private long testCompactionWithThroughputLimit() throws Exception {
long throughputLimit = 1024L * 1024;
Configuration conf = TEST_UTIL.getConfiguration();
conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName());
conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100);
conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200);
conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000);
conf.setLong(
PressureAwareCompactionThroughputController
.HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND,
throughputLimit);
conf.setLong(
PressureAwareCompactionThroughputController
.HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND,
throughputLimit);
conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
PressureAwareCompactionThroughputController.class.getName());
TEST_UTIL.startMiniCluster(1);
try {
Store store = prepareData();
assertEquals(10, store.getStorefilesCount());
long startTime = System.currentTimeMillis();
TEST_UTIL.getHBaseAdmin().majorCompact(tableName);
Thread.sleep(5000);
assertEquals(10, store.getStorefilesCount());
while (store.getStorefilesCount() != 1) {
Thread.sleep(20);
}
long duration = System.currentTimeMillis() - startTime;
double throughput = (double) store.getStorefilesSize() / duration * 1000;
// confirm that the speed limit work properly(not too fast, and also not too slow)
// 20% is the max acceptable error rate.
assertTrue(throughput < throughputLimit * 1.2);
assertTrue(throughput > throughputLimit * 0.8);
return System.currentTimeMillis() - startTime;
} finally {
TEST_UTIL.shutdownMiniCluster();
}
}
private long testCompactionWithoutThroughputLimit() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName());
conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100);
conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200);
conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000);
conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
NoLimitCompactionThroughputController.class.getName());
TEST_UTIL.startMiniCluster(1);
try {
Store store = prepareData();
assertEquals(10, store.getStorefilesCount());
long startTime = System.currentTimeMillis();
TEST_UTIL.getHBaseAdmin().majorCompact(tableName);
while (store.getStorefilesCount() != 1) {
Thread.sleep(20);
}
return System.currentTimeMillis() - startTime;
} finally {
TEST_UTIL.shutdownMiniCluster();
}
}
@Test
public void testCompaction() throws Exception {
long limitTime = testCompactionWithThroughputLimit();
long noLimitTime = testCompactionWithoutThroughputLimit();
LOG.info("With 1M/s limit, compaction use " + limitTime + "ms; without limit, compaction use "
+ noLimitTime + "ms");
// usually the throughput of a compaction without limitation is about 40MB/sec at least, so this
// is a very weak assumption.
assertTrue(limitTime > noLimitTime * 2);
}
/**
* Test the tuning task of {@link PressureAwareCompactionThroughputController}
*/
@Test
public void testThroughputTuning() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName());
conf.setLong(
PressureAwareCompactionThroughputController
.HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND,
20L * 1024 * 1024);
conf.setLong(
PressureAwareCompactionThroughputController
.HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND,
10L * 1024 * 1024);
conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 4);
conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 6);
conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
PressureAwareCompactionThroughputController.class.getName());
conf.setInt(
PressureAwareCompactionThroughputController.HBASE_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD,
1000);
TEST_UTIL.startMiniCluster(1);
Connection conn = ConnectionFactory.createConnection(conf);
try {
HTableDescriptor htd = new HTableDescriptor(tableName);
htd.addFamily(new HColumnDescriptor(family));
htd.setCompactionEnabled(false);
TEST_UTIL.getHBaseAdmin().createTable(htd);
TEST_UTIL.waitTableAvailable(tableName);
HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName);
PressureAwareCompactionThroughputController throughputController =
(PressureAwareCompactionThroughputController) regionServer.compactSplitThread
.getCompactionThroughputController();
assertEquals(10L * 1024 * 1024, throughputController.maxThroughput, EPSILON);
Table table = conn.getTable(tableName);
for (int i = 0; i < 5; i++) {
table.put(new Put(Bytes.toBytes(i)).add(family, qualifier, new byte[0]));
TEST_UTIL.flush(tableName);
}
Thread.sleep(2000);
assertEquals(15L * 1024 * 1024, throughputController.maxThroughput, EPSILON);
table.put(new Put(Bytes.toBytes(5)).add(family, qualifier, new byte[0]));
TEST_UTIL.flush(tableName);
Thread.sleep(2000);
assertEquals(20L * 1024 * 1024, throughputController.maxThroughput, EPSILON);
table.put(new Put(Bytes.toBytes(6)).add(family, qualifier, new byte[0]));
TEST_UTIL.flush(tableName);
Thread.sleep(2000);
assertEquals(Double.MAX_VALUE, throughputController.maxThroughput, EPSILON);
conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
NoLimitCompactionThroughputController.class.getName());
regionServer.compactSplitThread.onConfigurationChange(conf);
assertTrue(throughputController.isStopped());
assertTrue(regionServer.compactSplitThread.getCompactionThroughputController()
instanceof NoLimitCompactionThroughputController);
} finally {
conn.close();
TEST_UTIL.shutdownMiniCluster();
}
}
/**
* Test the logic that we calculate compaction pressure for a striped store.
*/
@Test
public void testGetCompactionPressureForStripedStore() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, StripeStoreEngine.class.getName());
conf.setBoolean(StripeStoreConfig.FLUSH_TO_L0_KEY, false);
conf.setInt(StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, 2);
conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 4);
conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 12);
TEST_UTIL.startMiniCluster(1);
Connection conn = ConnectionFactory.createConnection(conf);
try {
HTableDescriptor htd = new HTableDescriptor(tableName);
htd.addFamily(new HColumnDescriptor(family));
htd.setCompactionEnabled(false);
TEST_UTIL.getHBaseAdmin().createTable(htd);
TEST_UTIL.waitTableAvailable(tableName);
HStore store = (HStore) getStoreWithName(tableName);
assertEquals(0, store.getStorefilesCount());
assertEquals(0.0, store.getCompactionPressure(), EPSILON);
Table table = conn.getTable(tableName);
for (int i = 0; i < 4; i++) {
table.put(new Put(Bytes.toBytes(i)).add(family, qualifier, new byte[0]));
table.put(new Put(Bytes.toBytes(100 + i)).add(family, qualifier, new byte[0]));
TEST_UTIL.flush(tableName);
}
assertEquals(8, store.getStorefilesCount());
assertEquals(0.0, store.getCompactionPressure(), EPSILON);
table.put(new Put(Bytes.toBytes(4)).add(family, qualifier, new byte[0]));
table.put(new Put(Bytes.toBytes(104)).add(family, qualifier, new byte[0]));
TEST_UTIL.flush(tableName);
assertEquals(10, store.getStorefilesCount());
assertEquals(0.5, store.getCompactionPressure(), EPSILON);
table.put(new Put(Bytes.toBytes(5)).add(family, qualifier, new byte[0]));
table.put(new Put(Bytes.toBytes(105)).add(family, qualifier, new byte[0]));
TEST_UTIL.flush(tableName);
assertEquals(12, store.getStorefilesCount());
assertEquals(1.0, store.getCompactionPressure(), EPSILON);
table.put(new Put(Bytes.toBytes(6)).add(family, qualifier, new byte[0]));
table.put(new Put(Bytes.toBytes(106)).add(family, qualifier, new byte[0]));
TEST_UTIL.flush(tableName);
assertEquals(14, store.getStorefilesCount());
assertEquals(2.0, store.getCompactionPressure(), EPSILON);
} finally {
conn.close();
TEST_UTIL.shutdownMiniCluster();
}
}
}

View File

@ -47,6 +47,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
@ -211,9 +212,10 @@ public class TestStripeCompactionPolicy {
assertTrue(policy.needsCompactions(si, al()));
StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
assertEquals(si.getStorefiles(), scr.getRequest().getFiles());
scr.execute(sc);
verify(sc, only()).compact(eq(scr.getRequest()), anyInt(), anyLong(),
aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY));
scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE);
verify(sc, only()).compact(eq(scr.getRequest()), anyInt(), anyLong(), aryEq(OPEN_KEY),
aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY),
any(NoLimitCompactionThroughputController.class));
}
@Test
@ -453,7 +455,7 @@ public class TestStripeCompactionPolicy {
// All the Stripes are expired, so the Compactor will not create any Writers. We need to create
// an empty file to preserve metadata
StripeCompactor sc = createCompactor();
List<Path> paths = scr.execute(sc);
List<Path> paths = scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE);
assertEquals(1, paths.size());
}
@ -512,22 +514,21 @@ public class TestStripeCompactionPolicy {
assertTrue(policy.needsCompactions(si, al()));
StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
verifyCollectionsEqual(sfs, scr.getRequest().getFiles());
scr.execute(sc);
verify(sc, times(1)).compact(eq(scr.getRequest()), argThat(
new ArgumentMatcher<List<byte[]>>() {
@Override
public boolean matches(Object argument) {
@SuppressWarnings("unchecked")
List<byte[]> other = (List<byte[]>)argument;
if (other.size() != boundaries.size()) return false;
for (int i = 0; i < other.size(); ++i) {
if (!Bytes.equals(other.get(i), boundaries.get(i))) return false;
}
return true;
}
}),
dropDeletesFrom == null ? isNull(byte[].class) : aryEq(dropDeletesFrom),
dropDeletesTo == null ? isNull(byte[].class) : aryEq(dropDeletesTo));
scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE);
verify(sc, times(1)).compact(eq(scr.getRequest()), argThat(new ArgumentMatcher<List<byte[]>>() {
@Override
public boolean matches(Object argument) {
@SuppressWarnings("unchecked")
List<byte[]> other = (List<byte[]>) argument;
if (other.size() != boundaries.size()) return false;
for (int i = 0; i < other.size(); ++i) {
if (!Bytes.equals(other.get(i), boundaries.get(i))) return false;
}
return true;
}
}), dropDeletesFrom == null ? isNull(byte[].class) : aryEq(dropDeletesFrom),
dropDeletesTo == null ? isNull(byte[].class) : aryEq(dropDeletesTo),
any(NoLimitCompactionThroughputController.class));
}
/**
@ -548,11 +549,12 @@ public class TestStripeCompactionPolicy {
assertTrue(!needsCompaction || policy.needsCompactions(si, al()));
StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
verifyCollectionsEqual(sfs, scr.getRequest().getFiles());
scr.execute(sc);
scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE);
verify(sc, times(1)).compact(eq(scr.getRequest()),
count == null ? anyInt() : eq(count.intValue()),
size == null ? anyLong() : eq(size.longValue()), aryEq(start), aryEq(end),
dropDeletesMatcher(dropDeletes, start), dropDeletesMatcher(dropDeletes, end));
count == null ? anyInt() : eq(count.intValue()),
size == null ? anyLong() : eq(size.longValue()), aryEq(start), aryEq(end),
dropDeletesMatcher(dropDeletes, start), dropDeletesMatcher(dropDeletes, end),
any(NoLimitCompactionThroughputController.class));
}
/** Verify arbitrary flush. */
@ -738,7 +740,10 @@ public class TestStripeCompactionPolicy {
HColumnDescriptor col = new HColumnDescriptor(Bytes.toBytes("foo"));
StoreFileWritersCapture writers = new StoreFileWritersCapture();
Store store = mock(Store.class);
HRegionInfo info = mock(HRegionInfo.class);
when(info.getRegionNameAsString()).thenReturn("testRegion");
when(store.getFamily()).thenReturn(col);
when(store.getRegionInfo()).thenReturn(info);
when(
store.createWriterInTmp(anyLong(), any(Compression.Algorithm.class), anyBoolean(),
anyBoolean(), anyBoolean())).thenAnswer(writers);