From 07962e24a57506226ebae18b5f5fd93ef99056f9 Mon Sep 17 00:00:00 2001 From: Nicolas Spiegelberg Date: Tue, 15 May 2012 17:54:06 +0000 Subject: [PATCH] [HBASE-5867] Improve Compaction Throttle Default Summary: We recently had a production issue where our compactions fell behind because our compaction throttle was improperly tuned and accidentally upgraded all compactions to the large pool. The default from HBASE-3877 makes 1 bad assumption: the default number of flushed files in a compaction. MinFilesToCompact should be taken into consideration. As a default, it is less damaging for the large thread to be slightly higher than it needs to be and only get timed-majors versus having everything accidentally promoted. Test Plan: - mvn test Reviewers: JIRA, Kannan, Liyin Reviewed By: Kannan CC: stack Differential Revision: https://reviews.facebook.net/D2943 git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1338809 13f79535-47bb-0310-9956-ffa450edef68 --- .../regionserver/CompactSplitThread.java | 74 ++++++------------- .../hadoop/hbase/regionserver/Store.java | 8 ++ 2 files changed, 30 insertions(+), 52 deletions(-) diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java index 1081fc625f9..755f51b427b 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java @@ -49,7 +49,6 @@ public class CompactSplitThread implements CompactionRequestor { private final ThreadPoolExecutor largeCompactions; private final ThreadPoolExecutor smallCompactions; private final ThreadPoolExecutor splits; - private final long throttleSize; /* The default priority for user-specified compaction requests. * The user gets top priority unless we have blocking compactions. (Pri <= 0) @@ -76,22 +75,11 @@ public class CompactSplitThread implements CompactionRequestor { "hbase.regionserver.thread.compaction.large", 1)); int smallThreads = conf.getInt( "hbase.regionserver.thread.compaction.small", 1); - if (conf.get("hbase.regionserver.thread.compaction.throttle") != null) { - throttleSize = conf.getLong( - "hbase.regionserver.thread.compaction.throttle", 0); - } else { - // we have a complicated default. see HBASE-3877 - long flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, - HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE); - long splitSize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, - HConstants.DEFAULT_MAX_FILE_SIZE); - throttleSize = Math.min(flushSize * 2, splitSize / 2); - } int splitThreads = conf.getInt("hbase.regionserver.thread.split", 1); // if we have throttle threads, make sure the user also specified size - Preconditions.checkArgument(smallThreads == 0 || throttleSize > 0); + Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0); final String n = Thread.currentThread().getName(); @@ -107,22 +95,18 @@ public class CompactSplitThread implements CompactionRequestor { }); this.largeCompactions .setRejectedExecutionHandler(new CompactionRequest.Rejection()); - if (smallThreads <= 0) { - this.smallCompactions = null; - } else { - this.smallCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, - 60, TimeUnit.SECONDS, new PriorityBlockingQueue(), - new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r); - t.setName(n + "-smallCompactions-" + System.currentTimeMillis()); - return t; - } - }); - this.smallCompactions - .setRejectedExecutionHandler(new CompactionRequest.Rejection()); - } + this.smallCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, + 60, TimeUnit.SECONDS, new PriorityBlockingQueue(), + new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r); + t.setName(n + "-smallCompactions-" + System.currentTimeMillis()); + return t; + } + }); + this.smallCompactions + .setRejectedExecutionHandler(new CompactionRequest.Rejection()); this.splits = (ThreadPoolExecutor) Executors.newFixedThreadPool(splitThreads, new ThreadFactory() { @@ -137,11 +121,9 @@ public class CompactSplitThread implements CompactionRequestor { @Override public String toString() { - return "compaction_queue=" - + (smallCompactions != null ? "(" - + largeCompactions.getQueue().size() + ":" - + smallCompactions.getQueue().size() + ")" - : largeCompactions.getQueue().size()) + return "compaction_queue=(" + + largeCompactions.getQueue().size() + ":" + + smallCompactions.getQueue().size() + ")" + ", split_queue=" + splits.getQueue().size(); } @@ -209,17 +191,11 @@ public class CompactSplitThread implements CompactionRequestor { if (priority != NO_PRIORITY) { cr.setPriority(priority); } - ThreadPoolExecutor pool = largeCompactions; - if (smallCompactions != null && throttleSize > cr.getSize()) { - // smallCompactions is like the 10 items or less line at Walmart - pool = smallCompactions; - } + ThreadPoolExecutor pool = s.throttleCompaction(cr.getSize()) + ? largeCompactions : smallCompactions; pool.execute(cr); if (LOG.isDebugEnabled()) { - String type = ""; - if (smallCompactions != null) { - type = (pool == smallCompactions) ? "Small " : "Large "; - } + String type = (pool == smallCompactions) ? "Small " : "Large "; LOG.debug(type + "Compaction requested: " + cr + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this); @@ -233,8 +209,7 @@ public class CompactSplitThread implements CompactionRequestor { void interruptIfNecessary() { splits.shutdown(); largeCompactions.shutdown(); - if (smallCompactions != null) - smallCompactions.shutdown(); + smallCompactions.shutdown(); } private void waitFor(ThreadPoolExecutor t, String name) { @@ -252,9 +227,7 @@ public class CompactSplitThread implements CompactionRequestor { void join() { waitFor(splits, "Split Thread"); waitFor(largeCompactions, "Large Compaction Thread"); - if (smallCompactions != null) { - waitFor(smallCompactions, "Small Compaction Thread"); - } + waitFor(smallCompactions, "Small Compaction Thread"); } /** @@ -264,10 +237,7 @@ public class CompactSplitThread implements CompactionRequestor { * @return The current size of the regions queue. */ public int getCompactionQueueSize() { - int size = largeCompactions.getQueue().size(); - if (smallCompactions != null) - size += smallCompactions.getQueue().size(); - return size; + return largeCompactions.getQueue().size() + smallCompactions.getQueue().size(); } private boolean shouldSplitRegion() { diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index bf1618e267e..0b482c3c0b4 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -1998,6 +1998,14 @@ public class Store extends SchemaConfigured implements HeapSize { return this.blockingStoreFileCount - this.storefiles.size(); } + boolean throttleCompaction(long compactionSize) { + // see HBASE-5867 for discussion on the default + long throttlePoint = conf.getLong( + "hbase.regionserver.thread.compaction.throttle", + 2 * this.minFilesToCompact * this.region.memstoreFlushSize); + return compactionSize > throttlePoint; + } + HRegion getHRegion() { return this.region; }