HBASE-27332 Remove RejectedExecutionHandler for long/short compaction thread pools (#4731)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
3d39482339
commit
72c1a84750
|
@ -31,7 +31,6 @@ import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.RejectedExecutionException;
|
import java.util.concurrent.RejectedExecutionException;
|
||||||
import java.util.concurrent.RejectedExecutionHandler;
|
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
@ -145,15 +144,18 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
|
||||||
final String n = Thread.currentThread().getName();
|
final String n = Thread.currentThread().getName();
|
||||||
|
|
||||||
StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<Runnable>(COMPARATOR);
|
StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<Runnable>(COMPARATOR);
|
||||||
|
// Since the StealJobQueue inner uses the PriorityBlockingQueue,
|
||||||
|
// which is an unbounded blocking queue, we remove the RejectedExecutionHandler for
|
||||||
|
// the long and short compaction thread pool executors since HBASE-27332.
|
||||||
|
// If anyone who what to change the StealJobQueue to a bounded queue,
|
||||||
|
// please add the rejection handler back.
|
||||||
this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 60, TimeUnit.SECONDS,
|
this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 60, TimeUnit.SECONDS,
|
||||||
stealJobQueue,
|
stealJobQueue,
|
||||||
new ThreadFactoryBuilder().setNameFormat(n + "-longCompactions-%d").setDaemon(true).build());
|
new ThreadFactoryBuilder().setNameFormat(n + "-longCompactions-%d").setDaemon(true).build());
|
||||||
this.longCompactions.setRejectedExecutionHandler(new Rejection());
|
|
||||||
this.longCompactions.prestartAllCoreThreads();
|
this.longCompactions.prestartAllCoreThreads();
|
||||||
this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 60, TimeUnit.SECONDS,
|
this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 60, TimeUnit.SECONDS,
|
||||||
stealJobQueue.getStealFromQueue(),
|
stealJobQueue.getStealFromQueue(),
|
||||||
new ThreadFactoryBuilder().setNameFormat(n + "-shortCompactions-%d").setDaemon(true).build());
|
new ThreadFactoryBuilder().setNameFormat(n + "-shortCompactions-%d").setDaemon(true).build());
|
||||||
this.shortCompactions.setRejectedExecutionHandler(new Rejection());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -724,22 +726,6 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Cleanup class to use when rejecting a compaction request from the queue.
|
|
||||||
*/
|
|
||||||
private static class Rejection implements RejectedExecutionHandler {
|
|
||||||
@Override
|
|
||||||
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
|
|
||||||
if (runnable instanceof CompactionRunner) {
|
|
||||||
CompactionRunner runner = (CompactionRunner) runnable;
|
|
||||||
LOG.debug("Compaction Rejected: " + runner);
|
|
||||||
if (runner.compaction != null) {
|
|
||||||
runner.store.cancelRequestedCompaction(runner.compaction);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@inheritDoc}
|
* {@inheritDoc}
|
||||||
*/
|
*/
|
||||||
|
|
Loading…
Reference in New Issue