diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java index 6a054dcf113..68c4d974f9d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java @@ -56,8 +56,8 @@ public class CompactSplitThread implements CompactionRequestor { private final HRegionServer server; private final Configuration conf; - private final ThreadPoolExecutor largeCompactions; - private final ThreadPoolExecutor smallCompactions; + private final ThreadPoolExecutor longCompactions; + private final ThreadPoolExecutor shortCompactions; private final ThreadPoolExecutor splits; private final ThreadPoolExecutor mergePool; @@ -88,28 +88,28 @@ public class CompactSplitThread implements CompactionRequestor { final String n = Thread.currentThread().getName(); - this.largeCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, + this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 60, TimeUnit.SECONDS, new PriorityBlockingQueue(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); - t.setName(n + "-largeCompactions-" + System.currentTimeMillis()); + t.setName(n + "-longCompactions-" + System.currentTimeMillis()); return t; } }); - this.largeCompactions.setRejectedExecutionHandler(new Rejection()); - this.smallCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, + this.longCompactions.setRejectedExecutionHandler(new Rejection()); + this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 60, TimeUnit.SECONDS, new PriorityBlockingQueue(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); - t.setName(n + "-smallCompactions-" + System.currentTimeMillis()); + t.setName(n + "-shortCompactions-" + System.currentTimeMillis()); return t; } }); - this.smallCompactions + this.shortCompactions .setRejectedExecutionHandler(new Rejection()); this.splits = (ThreadPoolExecutor) Executors.newFixedThreadPool(splitThreads, @@ -136,8 +136,8 @@ public class CompactSplitThread implements CompactionRequestor { @Override public String toString() { return "compaction_queue=(" - + largeCompactions.getQueue().size() + ":" - + smallCompactions.getQueue().size() + ")" + + longCompactions.getQueue().size() + ":" + + shortCompactions.getQueue().size() + ")" + ", split_queue=" + splits.getQueue().size() + ", merge_queue=" + mergePool.getQueue().size(); } @@ -146,17 +146,17 @@ public class CompactSplitThread implements CompactionRequestor { StringBuffer queueLists = new StringBuffer(); queueLists.append("Compaction/Split Queue dump:\n"); queueLists.append(" LargeCompation Queue:\n"); - BlockingQueue lq = largeCompactions.getQueue(); + BlockingQueue lq = longCompactions.getQueue(); Iterator it = lq.iterator(); while(it.hasNext()){ queueLists.append(" "+it.next().toString()); queueLists.append("\n"); } - if( smallCompactions != null ){ + if( shortCompactions != null ){ queueLists.append("\n"); queueLists.append(" SmallCompation Queue:\n"); - lq = smallCompactions.getQueue(); + lq = shortCompactions.getQueue(); it = lq.iterator(); while(it.hasNext()){ queueLists.append(" "+it.next().toString()); @@ -312,10 +312,10 @@ public class CompactSplitThread implements CompactionRequestor { // pool; we will do selection there, and move to large pool if necessary. long size = selectNow ? compaction.getRequest().getSize() : 0; ThreadPoolExecutor pool = (!selectNow && s.throttleCompaction(size)) - ? largeCompactions : smallCompactions; + ? longCompactions : shortCompactions; pool.execute(new CompactionRunner(s, r, compaction, pool)); if (LOG.isDebugEnabled()) { - String type = (pool == smallCompactions) ? "Small " : "Large "; + String type = (pool == shortCompactions) ? "Small " : "Large "; LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system") + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this); } @@ -345,8 +345,8 @@ public class CompactSplitThread implements CompactionRequestor { void interruptIfNecessary() { splits.shutdown(); mergePool.shutdown(); - largeCompactions.shutdown(); - smallCompactions.shutdown(); + longCompactions.shutdown(); + shortCompactions.shutdown(); } private void waitFor(ThreadPoolExecutor t, String name) { @@ -367,8 +367,8 @@ public class CompactSplitThread implements CompactionRequestor { void join() { waitFor(splits, "Split Thread"); waitFor(mergePool, "Merge Thread"); - waitFor(largeCompactions, "Large Compaction Thread"); - waitFor(smallCompactions, "Small Compaction Thread"); + waitFor(longCompactions, "Large Compaction Thread"); + waitFor(shortCompactions, "Small Compaction Thread"); } /** @@ -378,16 +378,16 @@ public class CompactSplitThread implements CompactionRequestor { * @return The current size of the regions queue. */ public int getCompactionQueueSize() { - return largeCompactions.getQueue().size() + smallCompactions.getQueue().size(); + return longCompactions.getQueue().size() + shortCompactions.getQueue().size(); } public int getLargeCompactionQueueSize() { - return largeCompactions.getQueue().size(); + return longCompactions.getQueue().size(); } public int getSmallCompactionQueueSize() { - return smallCompactions.getQueue().size(); + return shortCompactions.getQueue().size(); } @@ -455,7 +455,7 @@ public class CompactSplitThread implements CompactionRequestor { // We might end up waiting for a while, so cancel the selection. assert this.compaction.hasSelection(); ThreadPoolExecutor pool = store.throttleCompaction( - compaction.getRequest().getSize()) ? largeCompactions : smallCompactions; + compaction.getRequest().getSize()) ? longCompactions : shortCompactions; if (this.parent != pool) { this.store.cancelRequestedCompaction(this.compaction); this.compaction = null; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index a5903d8b869..9c540c30655 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -4527,19 +4527,6 @@ public class HRegion implements HeapSize { // , Writable{ return dstRegion; } - /** - * @return True if needs a major compaction. - * @throws IOException - */ - boolean isMajorCompaction() throws IOException { - for (Store store : this.stores.values()) { - if (store.isMajorCompaction()) { - return true; - } - } - return false; - } - // // HBASE-880 // @@ -5802,7 +5789,7 @@ public class HRegion implements HeapSize { // , Writable{ (isMajor ? majorInProgress : minorInProgress).incrementAndGet(); } - public void reportCompactionRequestEnd(boolean isMajor, int numFiles, long filesSizeCompacted){ + public void reportCompactionRequestEnd(boolean isMajor, int numFiles, long filesSizeCompacted) { int newValue = (isMajor ? majorInProgress : minorInProgress).decrementAndGet(); // metrics diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index ef2d358b30d..7a06fca165c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -1163,10 +1163,9 @@ public class HStore implements Store { CompactionRequest cr, List sfs, long compactionStartTime) { long now = EnvironmentEdgeManager.currentTimeMillis(); StringBuilder message = new StringBuilder( - "Completed" + (cr.isMajor() ? " major " : " ") + "compaction of " - + cr.getFiles().size() + " file(s) in " + this + " of " - + this.getRegionInfo().getRegionNameAsString() - + " into "); + "Completed" + (cr.isMajor() ? " major" : "") + " compaction of " + + cr.getFiles().size() + (cr.isAllFiles() ? " (all)" : "") + " file(s) in " + + this + " of " + this.getRegionInfo().getRegionNameAsString() + " into "); if (sfs.isEmpty()) { message.append("none, "); } else { @@ -1342,6 +1341,7 @@ public class HStore implements Store { } CompactionContext compaction = storeEngine.createCompaction(); + CompactionRequest request = null; this.lock.readLock().lock(); try { synchronized (filesCompacting) { @@ -1388,9 +1388,9 @@ public class HStore implements Store { compaction.forceSelect( baseRequest.combineWith(compaction.getRequest())); } - // Finally, we have the resulting files list. Check if we have any files at all. - final Collection selectedFiles = compaction.getRequest().getFiles(); + request = compaction.getRequest(); + final Collection selectedFiles = request.getFiles(); if (selectedFiles.isEmpty()) { return null; } @@ -1404,24 +1404,21 @@ public class HStore implements Store { Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID); // If we're enqueuing a major, clear the force flag. - boolean isMajor = selectedFiles.size() == this.getStorefilesCount(); - this.forceMajor = this.forceMajor && !isMajor; + this.forceMajor = this.forceMajor && !request.isMajor(); // Set common request properties. // Set priority, either override value supplied by caller or from store. - compaction.getRequest().setPriority( - (priority != Store.NO_PRIORITY) ? priority : getCompactPriority()); - compaction.getRequest().setIsMajor(isMajor); - compaction.getRequest().setDescription( - getRegionInfo().getRegionNameAsString(), getColumnFamilyName()); + request.setPriority((priority != Store.NO_PRIORITY) ? priority : getCompactPriority()); + request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName()); } } finally { this.lock.readLock().unlock(); } - LOG.debug(getRegionInfo().getEncodedName() + " - " + getColumnFamilyName() + ": Initiating " - + (compaction.getRequest().isMajor() ? "major" : "minor") + " compaction"); - this.region.reportCompactionRequestStart(compaction.getRequest().isMajor()); + LOG.debug(getRegionInfo().getEncodedName() + " - " + getColumnFamilyName() + + ": Initiating " + (request.isMajor() ? "major" : "minor") + " compaction" + + (request.isAllFiles() ? " (all files)" : "")); + this.region.reportCompactionRequestStart(request.isMajor()); return compaction; } @@ -1732,9 +1729,6 @@ public class HStore implements Store { this.forceMajor = true; } - boolean getForceMajorCompaction() { - return this.forceMajor; - } ////////////////////////////////////////////////////////////////////////////// // File administration diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java index 3e75b8b76c9..35fc44fda17 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java @@ -45,7 +45,8 @@ public class CompactionRequest implements Comparable { static final Log LOG = LogFactory.getLog(CompactionRequest.class); // was this compaction promoted to an off-peak private boolean isOffPeak = false; - private boolean isMajor = false; + private enum DisplayCompactionType { MINOR, ALL_FILES, MAJOR } + private DisplayCompactionType isMajor = DisplayCompactionType.MINOR; private int priority = Store.NO_PRIORITY; private Collection filesToCompact; @@ -156,8 +157,13 @@ public class CompactionRequest implements Comparable { return totalSize; } + public boolean isAllFiles() { + return this.isMajor == DisplayCompactionType.MAJOR + || this.isMajor == DisplayCompactionType.ALL_FILES; + } + public boolean isMajor() { - return this.isMajor; + return this.isMajor == DisplayCompactionType.MAJOR; } /** Gets the priority for the request */ @@ -187,8 +193,10 @@ public class CompactionRequest implements Comparable { * @param isMajor true if the system determines that this compaction should be a major * compaction */ - public void setIsMajor(boolean isMajor) { - this.isMajor = isMajor; + public void setIsMajor(boolean isMajor, boolean isAllFiles) { + assert isAllFiles || !isMajor; + this.isMajor = !isAllFiles ? DisplayCompactionType.MINOR + : (isMajor ? DisplayCompactionType.MAJOR : DisplayCompactionType.ALL_FILES); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java index 7a479d89711..3e8523d9c92 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java @@ -45,7 +45,7 @@ public class DefaultCompactor extends Compactor { * Do a minor/major compaction on an explicit set of storefiles from a Store. */ public List compact(final CompactionRequest request) throws IOException { - FileDetails fd = getFileDetails(request.getFiles(), request.isMajor()); + FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles()); this.progress = new CompactionProgress(fd.maxKeyCount); // Find the smallest read point across all the Scanners. @@ -57,9 +57,9 @@ public class DefaultCompactor extends Compactor { try { InternalScanner scanner = null; try { - /* Include deletes, unless we are doing a major compaction */ + /* Include deletes, unless we are doing a compaction of all files */ ScanType scanType = - request.isMajor() ? ScanType.COMPACT_DROP_DELETES : ScanType.COMPACT_RETAIN_DELETES; + request.isAllFiles() ? ScanType.COMPACT_DROP_DELETES : ScanType.COMPACT_RETAIN_DELETES; scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners); if (scanner == null) { scanner = createScanner(store, scanners, scanType, smallestReadPoint, fd.earliestPutTs); @@ -89,7 +89,7 @@ public class DefaultCompactor extends Compactor { } } finally { if (writer != null) { - writer.appendMetadata(fd.maxSeqId, request.isMajor()); + writer.appendMetadata(fd.maxSeqId, request.isAllFiles()); writer.close(); newFiles.add(writer.getPath()); } @@ -110,7 +110,7 @@ public class DefaultCompactor extends Compactor { public List compactForTesting(final Collection filesToCompact, boolean isMajor) throws IOException { CompactionRequest cr = new CompactionRequest(filesToCompact); - cr.setIsMajor(isMajor); + cr.setIsMajor(isMajor, isMajor); return this.compact(cr); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java index bc992260c5b..b7872347d5f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java @@ -73,7 +73,7 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy { } /** - * @param candidateFiles candidate files, ordered from oldest to newest + * @param candidateFiles candidate files, ordered from oldest to newest. All files in store. * @return subset copy of candidate list that meets compaction criteria * @throws java.io.IOException */ @@ -93,9 +93,11 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy { filesCompacting.size() + " compacting, " + candidateSelection.size() + " eligible, " + storeConfigInfo.getBlockingFileCount() + " blocking"); - long cfTtl = this.storeConfigInfo.getStoreFileTtl(); - if (!forceMajor) { + // If we can't have all files, we cannot do major anyway + boolean isAllFiles = candidateFiles.size() == candidateSelection.size(); + if (!(forceMajor && isAllFiles)) { // If there are expired files, only select them so that compaction deletes them + long cfTtl = this.storeConfigInfo.getStoreFileTtl(); if (comConf.shouldDeleteExpired() && (cfTtl != Long.MAX_VALUE)) { ArrayList expiredSelection = selectExpiredStoreFiles( candidateSelection, EnvironmentEdgeManager.currentTimeMillis() - cfTtl); @@ -104,28 +106,28 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy { } } candidateSelection = skipLargeFiles(candidateSelection); + isAllFiles = candidateFiles.size() == candidateSelection.size(); } - // Force a major compaction if this is a user-requested major compaction, - // or if we do not have too many files to compact and this was requested - // as a major compaction. + // Try a major compaction if this is a user-requested major compaction, + // or if we do not have too many files to compact and this was requested as a major compaction + boolean isTryingMajor = (forceMajor && isAllFiles && isUserCompaction) + || (((forceMajor && isAllFiles) || isMajorCompaction(candidateSelection)) + && (candidateSelection.size() < comConf.getMaxFilesToCompact())); // Or, if there are any references among the candidates. - boolean majorCompaction = ( - (forceMajor && isUserCompaction) - || ((forceMajor || isMajorCompaction(candidateSelection)) - && (candidateSelection.size() < comConf.getMaxFilesToCompact())) - || StoreUtils.hasReferences(candidateSelection) - ); - - if (!majorCompaction) { - // we're doing a minor compaction, let's see what files are applicable + boolean isAfterSplit = StoreUtils.hasReferences(candidateSelection); + if (!isTryingMajor && !isAfterSplit) { + // We're are not compacting all files, let's see what files are applicable candidateSelection = filterBulk(candidateSelection); candidateSelection = applyCompactionPolicy(candidateSelection, mayUseOffPeak, mayBeStuck); candidateSelection = checkMinFilesCriteria(candidateSelection); } - candidateSelection = removeExcessFiles(candidateSelection, isUserCompaction, majorCompaction); + candidateSelection = removeExcessFiles(candidateSelection, isUserCompaction, isTryingMajor); + // Now we have the final file list, so we can determine if we can do major/all files. + isAllFiles = (candidateFiles.size() == candidateSelection.size()); CompactionRequest result = new CompactionRequest(candidateSelection); - result.setOffPeak(!candidateSelection.isEmpty() && !majorCompaction && mayUseOffPeak); + result.setOffPeak(!candidateSelection.isEmpty() && !isAllFiles && mayUseOffPeak); + result.setIsMajor(isTryingMajor && isAllFiles, isAllFiles); return result; }