Revert "HBASE-27152 Under compaction mark may leak (#4568)"
This reverts commit 3ebb4360b9
.
This commit is contained in:
parent
3ebb4360b9
commit
cf4da36954
|
@ -31,6 +31,7 @@ import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.RejectedExecutionException;
|
import java.util.concurrent.RejectedExecutionException;
|
||||||
|
import java.util.concurrent.RejectedExecutionHandler;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
@ -144,15 +145,15 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
|
||||||
final String n = Thread.currentThread().getName();
|
final String n = Thread.currentThread().getName();
|
||||||
|
|
||||||
StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<Runnable>(COMPARATOR);
|
StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<Runnable>(COMPARATOR);
|
||||||
// Since the StealJobQueue is unbounded, we need not to set the RejectedExecutionHandler for
|
|
||||||
// the long and short compaction thread pool executors.
|
|
||||||
this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 60, TimeUnit.SECONDS,
|
this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 60, TimeUnit.SECONDS,
|
||||||
stealJobQueue,
|
stealJobQueue,
|
||||||
new ThreadFactoryBuilder().setNameFormat(n + "-longCompactions-%d").setDaemon(true).build());
|
new ThreadFactoryBuilder().setNameFormat(n + "-longCompactions-%d").setDaemon(true).build());
|
||||||
|
this.longCompactions.setRejectedExecutionHandler(new Rejection());
|
||||||
this.longCompactions.prestartAllCoreThreads();
|
this.longCompactions.prestartAllCoreThreads();
|
||||||
this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 60, TimeUnit.SECONDS,
|
this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 60, TimeUnit.SECONDS,
|
||||||
stealJobQueue.getStealFromQueue(),
|
stealJobQueue.getStealFromQueue(),
|
||||||
new ThreadFactoryBuilder().setNameFormat(n + "-shortCompactions-%d").setDaemon(true).build());
|
new ThreadFactoryBuilder().setNameFormat(n + "-shortCompactions-%d").setDaemon(true).build());
|
||||||
|
this.shortCompactions.setRejectedExecutionHandler(new Rejection());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -381,20 +382,15 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
|
||||||
// pool; we will do selection there, and move to large pool if necessary.
|
// pool; we will do selection there, and move to large pool if necessary.
|
||||||
pool = shortCompactions;
|
pool = shortCompactions;
|
||||||
}
|
}
|
||||||
|
pool.execute(
|
||||||
// A simple implementation for under compaction marks.
|
new CompactionRunner(store, region, compaction, tracker, completeTracker, pool, user));
|
||||||
// Since this method is always called in the synchronized methods, we do not need to use the
|
|
||||||
// boolean result to make sure that exactly the one that added here will be removed
|
|
||||||
// in the next steps.
|
|
||||||
underCompactionStores.add(getStoreNameForUnderCompaction(store));
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug(
|
LOG.debug(
|
||||||
"Add compact mark for store {}, priority={}, current under compaction "
|
"Add compact mark for store {}, priority={}, current under compaction "
|
||||||
+ "store size is {}",
|
+ "store size is {}",
|
||||||
getStoreNameForUnderCompaction(store), priority, underCompactionStores.size());
|
getStoreNameForUnderCompaction(store), priority, underCompactionStores.size());
|
||||||
}
|
}
|
||||||
pool.submit(
|
underCompactionStores.add(getStoreNameForUnderCompaction(store));
|
||||||
new CompactionRunner(store, region, compaction, tracker, completeTracker, pool, user));
|
|
||||||
region.incrementCompactionsQueuedCount();
|
region.incrementCompactionsQueuedCount();
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
String type = (pool == shortCompactions) ? "Small " : "Large ";
|
String type = (pool == shortCompactions) ? "Small " : "Large ";
|
||||||
|
@ -723,6 +719,22 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cleanup class to use when rejecting a compaction request from the queue.
|
||||||
|
*/
|
||||||
|
private static class Rejection implements RejectedExecutionHandler {
|
||||||
|
@Override
|
||||||
|
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
|
||||||
|
if (runnable instanceof CompactionRunner) {
|
||||||
|
CompactionRunner runner = (CompactionRunner) runnable;
|
||||||
|
LOG.debug("Compaction Rejected: " + runner);
|
||||||
|
if (runner.compaction != null) {
|
||||||
|
runner.store.cancelRequestedCompaction(runner.compaction);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@inheritDoc}
|
* {@inheritDoc}
|
||||||
*/
|
*/
|
||||||
|
|
Loading…
Reference in New Issue