HBASE-17972 Remove mergePool from CompactSplitThread (Guangxu Cheng)
This commit is contained in:
parent
b401a35fdc
commit
5411d3ecb1
|
@ -72,10 +72,6 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
|
|||
// Configuration key for split threads
|
||||
public final static String SPLIT_THREADS = "hbase.regionserver.thread.split";
|
||||
public final static int SPLIT_THREADS_DEFAULT = 1;
|
||||
|
||||
// Configuration keys for merge threads
|
||||
public final static String MERGE_THREADS = "hbase.regionserver.thread.merge";
|
||||
public final static int MERGE_THREADS_DEFAULT = 1;
|
||||
|
||||
public static final String REGION_SERVER_REGION_SPLIT_LIMIT =
|
||||
"hbase.regionserver.regionSplitLimit";
|
||||
|
@ -87,7 +83,6 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
|
|||
private final ThreadPoolExecutor longCompactions;
|
||||
private final ThreadPoolExecutor shortCompactions;
|
||||
private final ThreadPoolExecutor splits;
|
||||
private final ThreadPoolExecutor mergePool;
|
||||
|
||||
private volatile ThroughputController compactionThroughputController;
|
||||
|
||||
|
@ -150,15 +145,6 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
|
|||
return new Thread(r, name);
|
||||
}
|
||||
});
|
||||
int mergeThreads = conf.getInt(MERGE_THREADS, MERGE_THREADS_DEFAULT);
|
||||
this.mergePool = (ThreadPoolExecutor) Executors.newFixedThreadPool(
|
||||
mergeThreads, new ThreadFactory() {
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
String name = n + "-merges-" + System.currentTimeMillis();
|
||||
return new Thread(r, name);
|
||||
}
|
||||
});
|
||||
|
||||
// compaction throughput controller
|
||||
this.compactionThroughputController =
|
||||
|
@ -170,8 +156,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
|
|||
return "compaction_queue=("
|
||||
+ longCompactions.getQueue().size() + ":"
|
||||
+ shortCompactions.getQueue().size() + ")"
|
||||
+ ", split_queue=" + splits.getQueue().size()
|
||||
+ ", merge_queue=" + mergePool.getQueue().size();
|
||||
+ ", split_queue=" + splits.getQueue().size();
|
||||
}
|
||||
|
||||
public String dumpQueue() {
|
||||
|
@ -205,15 +190,6 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
|
|||
queueLists.append("\n");
|
||||
}
|
||||
|
||||
queueLists.append("\n");
|
||||
queueLists.append(" Region Merge Queue:\n");
|
||||
lq = mergePool.getQueue();
|
||||
it = lq.iterator();
|
||||
while (it.hasNext()) {
|
||||
queueLists.append(" " + it.next().toString());
|
||||
queueLists.append("\n");
|
||||
}
|
||||
|
||||
return queueLists.toString();
|
||||
}
|
||||
|
||||
|
@ -372,7 +348,6 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
|
|||
*/
|
||||
void interruptIfNecessary() {
|
||||
splits.shutdown();
|
||||
mergePool.shutdown();
|
||||
longCompactions.shutdown();
|
||||
shortCompactions.shutdown();
|
||||
}
|
||||
|
@ -394,7 +369,6 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
|
|||
|
||||
void join() {
|
||||
waitFor(splits, "Split Thread");
|
||||
waitFor(mergePool, "Merge Thread");
|
||||
waitFor(longCompactions, "Large Compaction Thread");
|
||||
waitFor(shortCompactions, "Small Compaction Thread");
|
||||
}
|
||||
|
@ -641,21 +615,6 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
|
|||
}
|
||||
}
|
||||
|
||||
int mergeThreads = newConf.getInt(MERGE_THREADS,
|
||||
MERGE_THREADS_DEFAULT);
|
||||
if (this.mergePool.getCorePoolSize() != mergeThreads) {
|
||||
LOG.info("Changing the value of " + MERGE_THREADS +
|
||||
" from " + this.mergePool.getCorePoolSize() + " to " +
|
||||
mergeThreads);
|
||||
if(this.mergePool.getCorePoolSize() < mergeThreads) {
|
||||
this.mergePool.setMaximumPoolSize(mergeThreads);
|
||||
this.mergePool.setCorePoolSize(mergeThreads);
|
||||
} else {
|
||||
this.mergePool.setCorePoolSize(mergeThreads);
|
||||
this.mergePool.setMaximumPoolSize(mergeThreads);
|
||||
}
|
||||
}
|
||||
|
||||
ThroughputController old = this.compactionThroughputController;
|
||||
if (old != null) {
|
||||
old.stop("configuration change");
|
||||
|
@ -680,10 +639,6 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
|
|||
return this.splits.getCorePoolSize();
|
||||
}
|
||||
|
||||
protected int getMergeThreadNum() {
|
||||
return this.mergePool.getCorePoolSize();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
|
@ -705,11 +660,6 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
|
|||
return compactionThroughputController;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public long getCompletedMergeTaskCount() {
|
||||
return mergePool.getCompletedTaskCount();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
/**
|
||||
* Shutdown the long compaction thread pool.
|
||||
|
|
|
@ -80,7 +80,6 @@ public class TestCompactSplitThread {
|
|||
conf.setInt(CompactSplitThread.LARGE_COMPACTION_THREADS, 3);
|
||||
conf.setInt(CompactSplitThread.SMALL_COMPACTION_THREADS, 4);
|
||||
conf.setInt(CompactSplitThread.SPLIT_THREADS, 5);
|
||||
conf.setInt(CompactSplitThread.MERGE_THREADS, 6);
|
||||
}
|
||||
|
||||
@After
|
||||
|
@ -113,13 +112,11 @@ public class TestCompactSplitThread {
|
|||
assertEquals(3, regionServer.compactSplitThread.getLargeCompactionThreadNum());
|
||||
assertEquals(4, regionServer.compactSplitThread.getSmallCompactionThreadNum());
|
||||
assertEquals(5, regionServer.compactSplitThread.getSplitThreadNum());
|
||||
assertEquals(6, regionServer.compactSplitThread.getMergeThreadNum());
|
||||
|
||||
// change bigger configurations and do online update
|
||||
conf.setInt(CompactSplitThread.LARGE_COMPACTION_THREADS, 4);
|
||||
conf.setInt(CompactSplitThread.SMALL_COMPACTION_THREADS, 5);
|
||||
conf.setInt(CompactSplitThread.SPLIT_THREADS, 6);
|
||||
conf.setInt(CompactSplitThread.MERGE_THREADS, 7);
|
||||
try {
|
||||
regionServer.compactSplitThread.onConfigurationChange(conf);
|
||||
} catch (IllegalArgumentException iae) {
|
||||
|
@ -130,13 +127,11 @@ public class TestCompactSplitThread {
|
|||
assertEquals(4, regionServer.compactSplitThread.getLargeCompactionThreadNum());
|
||||
assertEquals(5, regionServer.compactSplitThread.getSmallCompactionThreadNum());
|
||||
assertEquals(6, regionServer.compactSplitThread.getSplitThreadNum());
|
||||
assertEquals(7, regionServer.compactSplitThread.getMergeThreadNum());
|
||||
|
||||
// change smaller configurations and do online update
|
||||
conf.setInt(CompactSplitThread.LARGE_COMPACTION_THREADS, 2);
|
||||
conf.setInt(CompactSplitThread.SMALL_COMPACTION_THREADS, 3);
|
||||
conf.setInt(CompactSplitThread.SPLIT_THREADS, 4);
|
||||
conf.setInt(CompactSplitThread.MERGE_THREADS, 5);
|
||||
try {
|
||||
regionServer.compactSplitThread.onConfigurationChange(conf);
|
||||
} catch (IllegalArgumentException iae) {
|
||||
|
@ -147,7 +142,6 @@ public class TestCompactSplitThread {
|
|||
assertEquals(2, regionServer.compactSplitThread.getLargeCompactionThreadNum());
|
||||
assertEquals(3, regionServer.compactSplitThread.getSmallCompactionThreadNum());
|
||||
assertEquals(4, regionServer.compactSplitThread.getSplitThreadNum());
|
||||
assertEquals(5, regionServer.compactSplitThread.getMergeThreadNum());
|
||||
} finally {
|
||||
conn.close();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue