HBASE-25779 HRegionServer#compactSplitThread should be private

Minor refactor. Make the `compactSplitThread` member field of `HRegionServer` private, and gate
all access through the getter method.

Signed-off-by: Yulin Niu <niuyulin@apache.org>
Signed-off-by: Pankaj Kumar <pankajkumar@apache.org>
This commit is contained in:
Nick Dimiduk 2021-04-15 16:41:38 -07:00 committed by Nick Dimiduk
parent 2382f68b23
commit b061b0c4ed
10 changed files with 58 additions and 64 deletions

View File

@ -300,7 +300,7 @@ public class HRegionServer extends Thread implements
private boolean sameReplicationSourceAndSink; private boolean sameReplicationSourceAndSink;
// Compactions // Compactions
public CompactSplit compactSplitThread; private CompactSplit compactSplitThread;
/** /**
* Map of regions currently being served by this region server. Key is the * Map of regions currently being served by this region server. Key is the

View File

@ -581,9 +581,10 @@ class MemStoreFlusher implements FlushRequester {
LOG.warn("{} has too many store files({}); delaying flush up to {} ms", LOG.warn("{} has too many store files({}); delaying flush up to {} ms",
region.getRegionInfo().getEncodedName(), getStoreFileCount(region), region.getRegionInfo().getEncodedName(), getStoreFileCount(region),
this.blockingWaitTime); this.blockingWaitTime);
if (!this.server.compactSplitThread.requestSplit(region)) { final CompactSplit compactSplitThread = server.getCompactSplitThread();
if (!compactSplitThread.requestSplit(region)) {
try { try {
this.server.compactSplitThread.requestSystemCompaction(region, compactSplitThread.requestSystemCompaction(region,
Thread.currentThread().getName()); Thread.currentThread().getName());
} catch (IOException e) { } catch (IOException e) {
e = e instanceof RemoteException ? e = e instanceof RemoteException ?
@ -630,6 +631,7 @@ class MemStoreFlusher implements FlushRequester {
tracker.beforeExecution(); tracker.beforeExecution();
lock.readLock().lock(); lock.readLock().lock();
final CompactSplit compactSplitThread = server.getCompactSplitThread();
try { try {
notifyFlushRequest(region, emergencyFlush); notifyFlushRequest(region, emergencyFlush);
FlushResult flushResult = region.flushcache(families, false, tracker); FlushResult flushResult = region.flushcache(families, false, tracker);
@ -637,9 +639,9 @@ class MemStoreFlusher implements FlushRequester {
// We just want to check the size // We just want to check the size
boolean shouldSplit = region.checkSplit().isPresent(); boolean shouldSplit = region.checkSplit().isPresent();
if (shouldSplit) { if (shouldSplit) {
this.server.compactSplitThread.requestSplit(region); compactSplitThread.requestSplit(region);
} else if (shouldCompact) { } else if (shouldCompact) {
server.compactSplitThread.requestSystemCompaction(region, Thread.currentThread().getName()); compactSplitThread.requestSystemCompaction(region, Thread.currentThread().getName());
} }
} catch (DroppedSnapshotException ex) { } catch (DroppedSnapshotException ex) {
// Cache flush can fail in a few places. If it fails in a critical // Cache flush can fail in a few places. If it fails in a critical

View File

@ -235,37 +235,26 @@ class MetricsRegionServerWrapperImpl
@Override @Override
public int getSplitQueueSize() { public int getSplitQueueSize() {
if (this.regionServer.compactSplitThread == null) { final CompactSplit compactSplit = regionServer.getCompactSplitThread();
return 0; return compactSplit == null ? 0 : compactSplit.getSplitQueueSize();
}
return this.regionServer.compactSplitThread.getSplitQueueSize();
} }
@Override @Override
public int getCompactionQueueSize() { public int getCompactionQueueSize() {
//The thread could be zero. if so assume there is no queue. final CompactSplit compactSplit = regionServer.getCompactSplitThread();
if (this.regionServer.compactSplitThread == null) { return compactSplit == null ? 0 : compactSplit.getCompactionQueueSize();
return 0;
}
return this.regionServer.compactSplitThread.getCompactionQueueSize();
} }
@Override @Override
public int getSmallCompactionQueueSize() { public int getSmallCompactionQueueSize() {
//The thread could be zero. if so assume there is no queue. final CompactSplit compactSplit = regionServer.getCompactSplitThread();
if (this.regionServer.compactSplitThread == null) { return compactSplit == null ? 0 : compactSplit.getSmallCompactionQueueSize();
return 0;
}
return this.regionServer.compactSplitThread.getSmallCompactionQueueSize();
} }
@Override @Override
public int getLargeCompactionQueueSize() { public int getLargeCompactionQueueSize() {
//The thread could be zero. if so assume there is no queue. final CompactSplit compactSplit = regionServer.getCompactSplitThread();
if (this.regionServer.compactSplitThread == null) { return compactSplit == null ? 0 : compactSplit.getLargeCompactionQueueSize();
return 0;
}
return this.regionServer.compactSplitThread.getLargeCompactionQueueSize();
} }
@Override @Override

View File

@ -124,19 +124,20 @@ public class RSDumpServlet extends StateDumpServlet {
} }
} }
public static void dumpQueue(HRegionServer hrs, PrintWriter out) public static void dumpQueue(HRegionServer hrs, PrintWriter out) {
throws IOException { final CompactSplit compactSplit = hrs.getCompactSplitThread();
if (hrs.compactSplitThread != null) { if (compactSplit != null) {
// 1. Print out Compaction/Split Queue // 1. Print out Compaction/Split Queue
out.println("Compaction/Split Queue summary: " out.println("Compaction/Split Queue summary: " + compactSplit);
+ hrs.compactSplitThread.toString() ); out.println(compactSplit.dumpQueue());
out.println(hrs.compactSplitThread.dumpQueue());
} }
if (hrs.getMemStoreFlusher() != null) { final MemStoreFlusher memStoreFlusher = hrs.getMemStoreFlusher();
if (memStoreFlusher != null) {
// 2. Print out flush Queue // 2. Print out flush Queue
out.println("\nFlush Queue summary: " + hrs.getMemStoreFlusher().toString()); out.println();
out.println(hrs.getMemStoreFlusher().dumpQueue()); out.println("Flush Queue summary: " + memStoreFlusher);
out.println(memStoreFlusher.dumpQueue());
} }
} }

View File

@ -1711,17 +1711,18 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
@Override @Override
public CompactionSwitchResponse compactionSwitch(RpcController controller, public CompactionSwitchResponse compactionSwitch(RpcController controller,
CompactionSwitchRequest request) throws ServiceException { CompactionSwitchRequest request) throws ServiceException {
final CompactSplit compactSplitThread = regionServer.getCompactSplitThread();
try { try {
checkOpen(); checkOpen();
requestCount.increment(); requestCount.increment();
boolean prevState = regionServer.compactSplitThread.isCompactionsEnabled(); boolean prevState = compactSplitThread.isCompactionsEnabled();
CompactionSwitchResponse response = CompactionSwitchResponse response =
CompactionSwitchResponse.newBuilder().setPrevState(prevState).build(); CompactionSwitchResponse.newBuilder().setPrevState(prevState).build();
if (prevState == request.getEnabled()) { if (prevState == request.getEnabled()) {
// passed in requested state is same as current state. No action required // passed in requested state is same as current state. No action required
return response; return response;
} }
regionServer.compactSplitThread.switchCompaction(request.getEnabled()); compactSplitThread.switchCompaction(request.getEnabled());
return response; return response;
} catch (IOException ie) { } catch (IOException ie) {
throw new ServiceException(ie); throw new ServiceException(ie);
@ -1764,7 +1765,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
} }
boolean compactionNeeded = flushResult.isCompactionNeeded(); boolean compactionNeeded = flushResult.isCompactionNeeded();
if (compactionNeeded) { if (compactionNeeded) {
regionServer.compactSplitThread.requestSystemCompaction(region, regionServer.getCompactSplitThread().requestSystemCompaction(region,
"Compaction through user triggered flush"); "Compaction through user triggered flush");
} }
builder.setFlushed(flushResult.isFlushSucceeded()); builder.setFlushed(flushResult.isFlushSucceeded());
@ -1880,6 +1881,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
ClearCompactionQueuesResponse.Builder respBuilder = ClearCompactionQueuesResponse.newBuilder(); ClearCompactionQueuesResponse.Builder respBuilder = ClearCompactionQueuesResponse.newBuilder();
requestCount.increment(); requestCount.increment();
if (clearCompactionQueues.compareAndSet(false,true)) { if (clearCompactionQueues.compareAndSet(false,true)) {
final CompactSplit compactSplitThread = regionServer.getCompactSplitThread();
try { try {
checkOpen(); checkOpen();
regionServer.getRegionServerCoprocessorHost().preClearCompactionQueues(); regionServer.getRegionServerCoprocessorHost().preClearCompactionQueues();
@ -1887,10 +1889,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
LOG.debug("clear " + queueName + " compaction queue"); LOG.debug("clear " + queueName + " compaction queue");
switch (queueName) { switch (queueName) {
case "long": case "long":
regionServer.compactSplitThread.clearLongCompactionsQueue(); compactSplitThread.clearLongCompactionsQueue();
break; break;
case "short": case "short":
regionServer.compactSplitThread.clearShortCompactionsQueue(); compactSplitThread.clearShortCompactionsQueue();
break; break;
default: default:
LOG.warn("Unknown queue name " + queueName); LOG.warn("Unknown queue name " + queueName);

View File

@ -240,11 +240,11 @@ public class TestChangingEncoding {
final long maxWaitime = System.currentTimeMillis() + 500; final long maxWaitime = System.currentTimeMillis() + 500;
boolean cont; boolean cont;
do { do {
cont = rs.compactSplitThread.getCompactionQueueSize() == 0; cont = rs.getCompactSplitThread().getCompactionQueueSize() == 0;
Threads.sleep(1); Threads.sleep(1);
} while (cont && System.currentTimeMillis() < maxWaitime); } while (cont && System.currentTimeMillis() < maxWaitime);
while (rs.compactSplitThread.getCompactionQueueSize() > 0) { while (rs.getCompactSplitThread().getCompactionQueueSize() > 0) {
Threads.sleep(1); Threads.sleep(1);
} }
LOG.debug("Compaction queue size reached 0, continuing"); LOG.debug("Compaction queue size reached 0, continuing");

View File

@ -102,7 +102,7 @@ public class TestLoadAndSwitchEncodeOnDisk extends TestMiniClusterLoadSequential
// Wait until compaction completes // Wait until compaction completes
Threads.sleepWithoutInterrupt(5000); Threads.sleepWithoutInterrupt(5000);
HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0); HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
while (rs.compactSplitThread.getCompactionQueueSize() > 0) { while (rs.getCompactSplitThread().getCompactionQueueSize() > 0) {
Threads.sleep(50); Threads.sleep(50);
} }

View File

@ -120,39 +120,39 @@ public class TestCompactSplitThread {
HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName); HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName);
// check initial configuration of thread pool sizes // check initial configuration of thread pool sizes
assertEquals(3, regionServer.compactSplitThread.getLargeCompactionThreadNum()); assertEquals(3, regionServer.getCompactSplitThread().getLargeCompactionThreadNum());
assertEquals(4, regionServer.compactSplitThread.getSmallCompactionThreadNum()); assertEquals(4, regionServer.getCompactSplitThread().getSmallCompactionThreadNum());
assertEquals(5, regionServer.compactSplitThread.getSplitThreadNum()); assertEquals(5, regionServer.getCompactSplitThread().getSplitThreadNum());
// change bigger configurations and do online update // change bigger configurations and do online update
conf.setInt(CompactSplit.LARGE_COMPACTION_THREADS, 4); conf.setInt(CompactSplit.LARGE_COMPACTION_THREADS, 4);
conf.setInt(CompactSplit.SMALL_COMPACTION_THREADS, 5); conf.setInt(CompactSplit.SMALL_COMPACTION_THREADS, 5);
conf.setInt(CompactSplit.SPLIT_THREADS, 6); conf.setInt(CompactSplit.SPLIT_THREADS, 6);
try { try {
regionServer.compactSplitThread.onConfigurationChange(conf); regionServer.getCompactSplitThread().onConfigurationChange(conf);
} catch (IllegalArgumentException iae) { } catch (IllegalArgumentException iae) {
Assert.fail("Update bigger configuration failed!"); Assert.fail("Update bigger configuration failed!");
} }
// check again after online update // check again after online update
assertEquals(4, regionServer.compactSplitThread.getLargeCompactionThreadNum()); assertEquals(4, regionServer.getCompactSplitThread().getLargeCompactionThreadNum());
assertEquals(5, regionServer.compactSplitThread.getSmallCompactionThreadNum()); assertEquals(5, regionServer.getCompactSplitThread().getSmallCompactionThreadNum());
assertEquals(6, regionServer.compactSplitThread.getSplitThreadNum()); assertEquals(6, regionServer.getCompactSplitThread().getSplitThreadNum());
// change smaller configurations and do online update // change smaller configurations and do online update
conf.setInt(CompactSplit.LARGE_COMPACTION_THREADS, 2); conf.setInt(CompactSplit.LARGE_COMPACTION_THREADS, 2);
conf.setInt(CompactSplit.SMALL_COMPACTION_THREADS, 3); conf.setInt(CompactSplit.SMALL_COMPACTION_THREADS, 3);
conf.setInt(CompactSplit.SPLIT_THREADS, 4); conf.setInt(CompactSplit.SPLIT_THREADS, 4);
try { try {
regionServer.compactSplitThread.onConfigurationChange(conf); regionServer.getCompactSplitThread().onConfigurationChange(conf);
} catch (IllegalArgumentException iae) { } catch (IllegalArgumentException iae) {
Assert.fail("Update smaller configuration failed!"); Assert.fail("Update smaller configuration failed!");
} }
// check again after online update // check again after online update
assertEquals(2, regionServer.compactSplitThread.getLargeCompactionThreadNum()); assertEquals(2, regionServer.getCompactSplitThread().getLargeCompactionThreadNum());
assertEquals(3, regionServer.compactSplitThread.getSmallCompactionThreadNum()); assertEquals(3, regionServer.getCompactSplitThread().getSmallCompactionThreadNum());
assertEquals(4, regionServer.compactSplitThread.getSplitThreadNum()); assertEquals(4, regionServer.getCompactSplitThread().getSplitThreadNum());
} finally { } finally {
conn.close(); conn.close();
} }

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
@ -102,15 +103,14 @@ public class TestRegionServerOnlineConfigChange {
/** /**
* Check if the number of compaction threads changes online * Check if the number of compaction threads changes online
* @throws IOException
*/ */
@Test @Test
public void testNumCompactionThreadsOnlineChange() throws IOException { public void testNumCompactionThreadsOnlineChange() {
assertTrue(rs1.compactSplitThread != null); assertNotNull(rs1.getCompactSplitThread());
int newNumSmallThreads = int newNumSmallThreads =
rs1.compactSplitThread.getSmallCompactionThreadNum() + 1; rs1.getCompactSplitThread().getSmallCompactionThreadNum() + 1;
int newNumLargeThreads = int newNumLargeThreads =
rs1.compactSplitThread.getLargeCompactionThreadNum() + 1; rs1.getCompactSplitThread().getLargeCompactionThreadNum() + 1;
conf.setInt("hbase.regionserver.thread.compaction.small", conf.setInt("hbase.regionserver.thread.compaction.small",
newNumSmallThreads); newNumSmallThreads);
@ -119,9 +119,9 @@ public class TestRegionServerOnlineConfigChange {
rs1.getConfigurationManager().notifyAllObservers(conf); rs1.getConfigurationManager().notifyAllObservers(conf);
assertEquals(newNumSmallThreads, assertEquals(newNumSmallThreads,
rs1.compactSplitThread.getSmallCompactionThreadNum()); rs1.getCompactSplitThread().getSmallCompactionThreadNum());
assertEquals(newNumLargeThreads, assertEquals(newNumLargeThreads,
rs1.compactSplitThread.getLargeCompactionThreadNum()); rs1.getCompactSplitThread().getLargeCompactionThreadNum());
} }
/** /**

View File

@ -208,7 +208,7 @@ public class TestCompactionWithThroughputController {
TEST_UTIL.waitTableAvailable(tableName); TEST_UTIL.waitTableAvailable(tableName);
HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName); HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName);
PressureAwareCompactionThroughputController throughputController = PressureAwareCompactionThroughputController throughputController =
(PressureAwareCompactionThroughputController) regionServer.compactSplitThread (PressureAwareCompactionThroughputController) regionServer.getCompactSplitThread()
.getCompactionThroughputController(); .getCompactionThroughputController();
assertEquals(10L * 1024 * 1024, throughputController.getMaxThroughput(), EPSILON); assertEquals(10L * 1024 * 1024, throughputController.getMaxThroughput(), EPSILON);
Table table = conn.getTable(tableName); Table table = conn.getTable(tableName);
@ -234,9 +234,9 @@ public class TestCompactionWithThroughputController {
conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
NoLimitThroughputController.class.getName()); NoLimitThroughputController.class.getName());
regionServer.compactSplitThread.onConfigurationChange(conf); regionServer.getCompactSplitThread().onConfigurationChange(conf);
assertTrue(throughputController.isStopped()); assertTrue(throughputController.isStopped());
assertTrue(regionServer.compactSplitThread.getCompactionThroughputController() assertTrue(regionServer.getCompactSplitThread().getCompactionThroughputController()
instanceof NoLimitThroughputController); instanceof NoLimitThroughputController);
} finally { } finally {
conn.close(); conn.close();