[HBASE-5867] Improve Compaction Throttle Default
Summary: We recently had a production issue where our compactions fell behind because our compaction throttle was improperly tuned and accidentally upgraded all compactions to the large pool. The default from HBASE-3877 makes 1 bad assumption: the default number of flushed files in a compaction. MinFilesToCompact should be taken into consideration. As a default, it is less damaging for the large thread to be slightly higher than it needs to be and only get timed-majors versus having everything accidentally promoted. Test Plan: - mvn test Reviewers: JIRA, Kannan, Liyin Reviewed By: Kannan CC: stack Differential Revision: https://reviews.facebook.net/D2943 git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1338809 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e8560bf973
commit
07962e24a5
|
@ -49,7 +49,6 @@ public class CompactSplitThread implements CompactionRequestor {
|
|||
private final ThreadPoolExecutor largeCompactions;
|
||||
private final ThreadPoolExecutor smallCompactions;
|
||||
private final ThreadPoolExecutor splits;
|
||||
private final long throttleSize;
|
||||
|
||||
/* The default priority for user-specified compaction requests.
|
||||
* The user gets top priority unless we have blocking compactions. (Pri <= 0)
|
||||
|
@ -76,22 +75,11 @@ public class CompactSplitThread implements CompactionRequestor {
|
|||
"hbase.regionserver.thread.compaction.large", 1));
|
||||
int smallThreads = conf.getInt(
|
||||
"hbase.regionserver.thread.compaction.small", 1);
|
||||
if (conf.get("hbase.regionserver.thread.compaction.throttle") != null) {
|
||||
throttleSize = conf.getLong(
|
||||
"hbase.regionserver.thread.compaction.throttle", 0);
|
||||
} else {
|
||||
// we have a complicated default. see HBASE-3877
|
||||
long flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
|
||||
HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
|
||||
long splitSize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
|
||||
HConstants.DEFAULT_MAX_FILE_SIZE);
|
||||
throttleSize = Math.min(flushSize * 2, splitSize / 2);
|
||||
}
|
||||
|
||||
int splitThreads = conf.getInt("hbase.regionserver.thread.split", 1);
|
||||
|
||||
// if we have throttle threads, make sure the user also specified size
|
||||
Preconditions.checkArgument(smallThreads == 0 || throttleSize > 0);
|
||||
Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0);
|
||||
|
||||
final String n = Thread.currentThread().getName();
|
||||
|
||||
|
@ -107,22 +95,18 @@ public class CompactSplitThread implements CompactionRequestor {
|
|||
});
|
||||
this.largeCompactions
|
||||
.setRejectedExecutionHandler(new CompactionRequest.Rejection());
|
||||
if (smallThreads <= 0) {
|
||||
this.smallCompactions = null;
|
||||
} else {
|
||||
this.smallCompactions = new ThreadPoolExecutor(smallThreads, smallThreads,
|
||||
60, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>(),
|
||||
new ThreadFactory() {
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
Thread t = new Thread(r);
|
||||
t.setName(n + "-smallCompactions-" + System.currentTimeMillis());
|
||||
return t;
|
||||
}
|
||||
});
|
||||
this.smallCompactions
|
||||
.setRejectedExecutionHandler(new CompactionRequest.Rejection());
|
||||
}
|
||||
this.smallCompactions = new ThreadPoolExecutor(smallThreads, smallThreads,
|
||||
60, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>(),
|
||||
new ThreadFactory() {
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
Thread t = new Thread(r);
|
||||
t.setName(n + "-smallCompactions-" + System.currentTimeMillis());
|
||||
return t;
|
||||
}
|
||||
});
|
||||
this.smallCompactions
|
||||
.setRejectedExecutionHandler(new CompactionRequest.Rejection());
|
||||
this.splits = (ThreadPoolExecutor)
|
||||
Executors.newFixedThreadPool(splitThreads,
|
||||
new ThreadFactory() {
|
||||
|
@ -137,11 +121,9 @@ public class CompactSplitThread implements CompactionRequestor {
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "compaction_queue="
|
||||
+ (smallCompactions != null ? "("
|
||||
+ largeCompactions.getQueue().size() + ":"
|
||||
+ smallCompactions.getQueue().size() + ")"
|
||||
: largeCompactions.getQueue().size())
|
||||
return "compaction_queue=("
|
||||
+ largeCompactions.getQueue().size() + ":"
|
||||
+ smallCompactions.getQueue().size() + ")"
|
||||
+ ", split_queue=" + splits.getQueue().size();
|
||||
}
|
||||
|
||||
|
@ -209,17 +191,11 @@ public class CompactSplitThread implements CompactionRequestor {
|
|||
if (priority != NO_PRIORITY) {
|
||||
cr.setPriority(priority);
|
||||
}
|
||||
ThreadPoolExecutor pool = largeCompactions;
|
||||
if (smallCompactions != null && throttleSize > cr.getSize()) {
|
||||
// smallCompactions is like the 10 items or less line at Walmart
|
||||
pool = smallCompactions;
|
||||
}
|
||||
ThreadPoolExecutor pool = s.throttleCompaction(cr.getSize())
|
||||
? largeCompactions : smallCompactions;
|
||||
pool.execute(cr);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
String type = "";
|
||||
if (smallCompactions != null) {
|
||||
type = (pool == smallCompactions) ? "Small " : "Large ";
|
||||
}
|
||||
String type = (pool == smallCompactions) ? "Small " : "Large ";
|
||||
LOG.debug(type + "Compaction requested: " + cr
|
||||
+ (why != null && !why.isEmpty() ? "; Because: " + why : "")
|
||||
+ "; " + this);
|
||||
|
@ -233,8 +209,7 @@ public class CompactSplitThread implements CompactionRequestor {
|
|||
void interruptIfNecessary() {
|
||||
splits.shutdown();
|
||||
largeCompactions.shutdown();
|
||||
if (smallCompactions != null)
|
||||
smallCompactions.shutdown();
|
||||
smallCompactions.shutdown();
|
||||
}
|
||||
|
||||
private void waitFor(ThreadPoolExecutor t, String name) {
|
||||
|
@ -252,9 +227,7 @@ public class CompactSplitThread implements CompactionRequestor {
|
|||
void join() {
|
||||
waitFor(splits, "Split Thread");
|
||||
waitFor(largeCompactions, "Large Compaction Thread");
|
||||
if (smallCompactions != null) {
|
||||
waitFor(smallCompactions, "Small Compaction Thread");
|
||||
}
|
||||
waitFor(smallCompactions, "Small Compaction Thread");
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -264,10 +237,7 @@ public class CompactSplitThread implements CompactionRequestor {
|
|||
* @return The current size of the regions queue.
|
||||
*/
|
||||
public int getCompactionQueueSize() {
|
||||
int size = largeCompactions.getQueue().size();
|
||||
if (smallCompactions != null)
|
||||
size += smallCompactions.getQueue().size();
|
||||
return size;
|
||||
return largeCompactions.getQueue().size() + smallCompactions.getQueue().size();
|
||||
}
|
||||
|
||||
private boolean shouldSplitRegion() {
|
||||
|
|
|
@ -1998,6 +1998,14 @@ public class Store extends SchemaConfigured implements HeapSize {
|
|||
return this.blockingStoreFileCount - this.storefiles.size();
|
||||
}
|
||||
|
||||
boolean throttleCompaction(long compactionSize) {
|
||||
// see HBASE-5867 for discussion on the default
|
||||
long throttlePoint = conf.getLong(
|
||||
"hbase.regionserver.thread.compaction.throttle",
|
||||
2 * this.minFilesToCompact * this.region.memstoreFlushSize);
|
||||
return compactionSize > throttlePoint;
|
||||
}
|
||||
|
||||
HRegion getHRegion() {
|
||||
return this.region;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue