HBASE-27332 Remove RejectedExecutionHandler for long/short compaction thread pools (#4731)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
Xiaolin Ha 2022-09-05 21:13:34 +08:00 committed by sunhelly
parent acc657aa59
commit f7f2d7e1ae
1 changed files with 5 additions and 19 deletions

View File

@ -29,7 +29,6 @@ import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -133,15 +132,18 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
final String n = Thread.currentThread().getName();
StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<Runnable>(COMPARATOR);
// Since the StealJobQueue inner uses the PriorityBlockingQueue,
// which is an unbounded blocking queue, we remove the RejectedExecutionHandler for
// the long and short compaction thread pool executors since HBASE-27332.
// If anyone who what to change the StealJobQueue to a bounded queue,
// please add the rejection handler back.
this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 60, TimeUnit.SECONDS,
stealJobQueue,
new ThreadFactoryBuilder().setNameFormat(n + "-longCompactions-%d").setDaemon(true).build());
this.longCompactions.setRejectedExecutionHandler(new Rejection());
this.longCompactions.prestartAllCoreThreads();
this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 60, TimeUnit.SECONDS,
stealJobQueue.getStealFromQueue(),
new ThreadFactoryBuilder().setNameFormat(n + "-shortCompactions-%d").setDaemon(true).build());
this.shortCompactions.setRejectedExecutionHandler(new Rejection());
}
@Override
@ -673,22 +675,6 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
}
}
/**
* Cleanup class to use when rejecting a compaction request from the queue.
*/
private static class Rejection implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
if (runnable instanceof CompactionRunner) {
CompactionRunner runner = (CompactionRunner) runnable;
LOG.debug("Compaction Rejected: " + runner);
if (runner.compaction != null) {
runner.store.cancelRequestedCompaction(runner.compaction);
}
}
}
}
/**
* {@inheritDoc}
*/