From f7f2d7e1ae8ece753e00e163871b2aa0428f3257 Mon Sep 17 00:00:00 2001 From: Xiaolin Ha Date: Mon, 5 Sep 2022 21:13:34 +0800 Subject: [PATCH] HBASE-27332 Remove RejectedExecutionHandler for long/short compaction thread pools (#4731) Signed-off-by: Duo Zhang --- .../hbase/regionserver/CompactSplit.java | 24 ++++--------------- 1 file changed, 5 insertions(+), 19 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java index 183893fa2f1..1963b8f07e0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java @@ -29,7 +29,6 @@ import java.util.Optional; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -133,15 +132,18 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati final String n = Thread.currentThread().getName(); StealJobQueue stealJobQueue = new StealJobQueue(COMPARATOR); + // Since the StealJobQueue inner uses the PriorityBlockingQueue, + // which is an unbounded blocking queue, we remove the RejectedExecutionHandler for + // the long and short compaction thread pool executors since HBASE-27332. + // If anyone who what to change the StealJobQueue to a bounded queue, + // please add the rejection handler back. this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 60, TimeUnit.SECONDS, stealJobQueue, new ThreadFactoryBuilder().setNameFormat(n + "-longCompactions-%d").setDaemon(true).build()); - this.longCompactions.setRejectedExecutionHandler(new Rejection()); this.longCompactions.prestartAllCoreThreads(); this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 60, TimeUnit.SECONDS, stealJobQueue.getStealFromQueue(), new ThreadFactoryBuilder().setNameFormat(n + "-shortCompactions-%d").setDaemon(true).build()); - this.shortCompactions.setRejectedExecutionHandler(new Rejection()); } @Override @@ -673,22 +675,6 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati } } - /** - * Cleanup class to use when rejecting a compaction request from the queue. - */ - private static class Rejection implements RejectedExecutionHandler { - @Override - public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) { - if (runnable instanceof CompactionRunner) { - CompactionRunner runner = (CompactionRunner) runnable; - LOG.debug("Compaction Rejected: " + runner); - if (runner.compaction != null) { - runner.store.cancelRequestedCompaction(runner.compaction); - } - } - } - } - /** * {@inheritDoc} */