HBASE-22930 Set unique name to longCompactions/shortCompactions threads (#645)
This commit is contained in:
parent
daf1faabfd
commit
4cd51aed31
|
@ -31,6 +31,7 @@ import java.util.concurrent.RejectedExecutionHandler;
|
||||||
import java.util.concurrent.ThreadFactory;
|
import java.util.concurrent.ThreadFactory;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -133,9 +134,11 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
|
||||||
int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT);
|
int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT);
|
||||||
this.splits =
|
this.splits =
|
||||||
(ThreadPoolExecutor) Executors.newFixedThreadPool(splitThreads, new ThreadFactory() {
|
(ThreadPoolExecutor) Executors.newFixedThreadPool(splitThreads, new ThreadFactory() {
|
||||||
|
AtomicInteger splitThreadCounter = new AtomicInteger(0);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Thread newThread(Runnable r) {
|
public Thread newThread(Runnable r) {
|
||||||
String name = n + "-splits-" + System.currentTimeMillis();
|
String name = n + "-splits-" + splitThreadCounter.getAndIncrement();
|
||||||
return new Thread(r, name);
|
return new Thread(r, name);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -155,23 +158,25 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
|
||||||
final String n = Thread.currentThread().getName();
|
final String n = Thread.currentThread().getName();
|
||||||
|
|
||||||
StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<Runnable>();
|
StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<Runnable>();
|
||||||
this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 60,
|
this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 60, TimeUnit.SECONDS,
|
||||||
TimeUnit.SECONDS, stealJobQueue,
|
stealJobQueue, new ThreadFactory() {
|
||||||
new ThreadFactory() {
|
AtomicInteger longCompactionThreadCounter = new AtomicInteger(0);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Thread newThread(Runnable r) {
|
public Thread newThread(Runnable r) {
|
||||||
String name = n + "-longCompactions-" + System.currentTimeMillis();
|
String name = n + "-longCompactions-" + longCompactionThreadCounter.getAndIncrement();
|
||||||
return new Thread(r, name);
|
return new Thread(r, name);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
this.longCompactions.setRejectedExecutionHandler(new Rejection());
|
this.longCompactions.setRejectedExecutionHandler(new Rejection());
|
||||||
this.longCompactions.prestartAllCoreThreads();
|
this.longCompactions.prestartAllCoreThreads();
|
||||||
this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 60,
|
this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 60, TimeUnit.SECONDS,
|
||||||
TimeUnit.SECONDS, stealJobQueue.getStealFromQueue(),
|
stealJobQueue.getStealFromQueue(), new ThreadFactory() {
|
||||||
new ThreadFactory() {
|
AtomicInteger shortCompactionThreadCounter = new AtomicInteger(0);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Thread newThread(Runnable r) {
|
public Thread newThread(Runnable r) {
|
||||||
String name = n + "-shortCompactions-" + System.currentTimeMillis();
|
String name = n + "-shortCompactions-" + shortCompactionThreadCounter.getAndIncrement();
|
||||||
return new Thread(r, name);
|
return new Thread(r, name);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
Loading…
Reference in New Issue