HBASE-27152 Under compaction mark may leak (#4568)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
Xiaolin Ha 2022-07-20 08:49:10 +08:00 committed by GitHub
parent 075b3053cf
commit da27a67a1e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 10 additions and 22 deletions

View File

@ -31,7 +31,6 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -145,15 +144,15 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
final String n = Thread.currentThread().getName(); final String n = Thread.currentThread().getName();
StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<Runnable>(COMPARATOR); StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<Runnable>(COMPARATOR);
// Since the StealJobQueue is unbounded, we need not to set the RejectedExecutionHandler for
// the long and short compaction thread pool executors.
this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 60, TimeUnit.SECONDS, this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 60, TimeUnit.SECONDS,
stealJobQueue, stealJobQueue,
new ThreadFactoryBuilder().setNameFormat(n + "-longCompactions-%d").setDaemon(true).build()); new ThreadFactoryBuilder().setNameFormat(n + "-longCompactions-%d").setDaemon(true).build());
this.longCompactions.setRejectedExecutionHandler(new Rejection());
this.longCompactions.prestartAllCoreThreads(); this.longCompactions.prestartAllCoreThreads();
this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 60, TimeUnit.SECONDS, this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 60, TimeUnit.SECONDS,
stealJobQueue.getStealFromQueue(), stealJobQueue.getStealFromQueue(),
new ThreadFactoryBuilder().setNameFormat(n + "-shortCompactions-%d").setDaemon(true).build()); new ThreadFactoryBuilder().setNameFormat(n + "-shortCompactions-%d").setDaemon(true).build());
this.shortCompactions.setRejectedExecutionHandler(new Rejection());
} }
@Override @Override
@ -382,15 +381,20 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
// pool; we will do selection there, and move to large pool if necessary. // pool; we will do selection there, and move to large pool if necessary.
pool = shortCompactions; pool = shortCompactions;
} }
pool.execute(
new CompactionRunner(store, region, compaction, tracker, completeTracker, pool, user)); // A simple implementation for under compaction marks.
// Since this method is always called in the synchronized methods, we do not need to use the
// boolean result to make sure that exactly the one that added here will be removed
// in the next steps.
underCompactionStores.add(getStoreNameForUnderCompaction(store));
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug( LOG.debug(
"Add compact mark for store {}, priority={}, current under compaction " "Add compact mark for store {}, priority={}, current under compaction "
+ "store size is {}", + "store size is {}",
getStoreNameForUnderCompaction(store), priority, underCompactionStores.size()); getStoreNameForUnderCompaction(store), priority, underCompactionStores.size());
} }
underCompactionStores.add(getStoreNameForUnderCompaction(store)); pool.submit(
new CompactionRunner(store, region, compaction, tracker, completeTracker, pool, user));
region.incrementCompactionsQueuedCount(); region.incrementCompactionsQueuedCount();
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
String type = (pool == shortCompactions) ? "Small " : "Large "; String type = (pool == shortCompactions) ? "Small " : "Large ";
@ -719,22 +723,6 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
} }
} }
/**
* Cleanup class to use when rejecting a compaction request from the queue.
*/
private static class Rejection implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
if (runnable instanceof CompactionRunner) {
CompactionRunner runner = (CompactionRunner) runnable;
LOG.debug("Compaction Rejected: " + runner);
if (runner.compaction != null) {
runner.store.cancelRequestedCompaction(runner.compaction);
}
}
}
}
/** /**
* {@inheritDoc} * {@inheritDoc}
*/ */