diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java index 8a6e806292a..8cbded31f54 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java @@ -31,7 +31,6 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -145,15 +144,15 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati final String n = Thread.currentThread().getName(); StealJobQueue stealJobQueue = new StealJobQueue(COMPARATOR); + // Since the StealJobQueue is unbounded, we need not to set the RejectedExecutionHandler for + // the long and short compaction thread pool executors. this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 60, TimeUnit.SECONDS, stealJobQueue, new ThreadFactoryBuilder().setNameFormat(n + "-longCompactions-%d").setDaemon(true).build()); - this.longCompactions.setRejectedExecutionHandler(new Rejection()); this.longCompactions.prestartAllCoreThreads(); this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 60, TimeUnit.SECONDS, stealJobQueue.getStealFromQueue(), new ThreadFactoryBuilder().setNameFormat(n + "-shortCompactions-%d").setDaemon(true).build()); - this.shortCompactions.setRejectedExecutionHandler(new Rejection()); } @Override @@ -382,15 +381,20 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati // pool; we will do selection there, and move to large pool if necessary. pool = shortCompactions; } - pool.execute( - new CompactionRunner(store, region, compaction, tracker, completeTracker, pool, user)); + + // A simple implementation for under compaction marks. + // Since this method is always called in the synchronized methods, we do not need to use the + // boolean result to make sure that exactly the one that added here will be removed + // in the next steps. + underCompactionStores.add(getStoreNameForUnderCompaction(store)); if (LOG.isDebugEnabled()) { LOG.debug( "Add compact mark for store {}, priority={}, current under compaction " + "store size is {}", getStoreNameForUnderCompaction(store), priority, underCompactionStores.size()); } - underCompactionStores.add(getStoreNameForUnderCompaction(store)); + pool.submit( + new CompactionRunner(store, region, compaction, tracker, completeTracker, pool, user)); region.incrementCompactionsQueuedCount(); if (LOG.isDebugEnabled()) { String type = (pool == shortCompactions) ? "Small " : "Large "; @@ -719,22 +723,6 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati } } - /** - * Cleanup class to use when rejecting a compaction request from the queue. - */ - private static class Rejection implements RejectedExecutionHandler { - @Override - public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) { - if (runnable instanceof CompactionRunner) { - CompactionRunner runner = (CompactionRunner) runnable; - LOG.debug("Compaction Rejected: " + runner); - if (runner.compaction != null) { - runner.store.cancelRequestedCompaction(runner.compaction); - } - } - } - } - /** * {@inheritDoc} */