diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 3386e07297b..36fdc77a56e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -300,7 +300,7 @@ public class HRegionServer extends Thread implements private boolean sameReplicationSourceAndSink; // Compactions - public CompactSplit compactSplitThread; + private CompactSplit compactSplitThread; /** * Map of regions currently being served by this region server. Key is the diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java index 5e62f7aa871..17927ad90b0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java @@ -581,9 +581,10 @@ class MemStoreFlusher implements FlushRequester { LOG.warn("{} has too many store files({}); delaying flush up to {} ms", region.getRegionInfo().getEncodedName(), getStoreFileCount(region), this.blockingWaitTime); - if (!this.server.compactSplitThread.requestSplit(region)) { + final CompactSplit compactSplitThread = server.getCompactSplitThread(); + if (!compactSplitThread.requestSplit(region)) { try { - this.server.compactSplitThread.requestSystemCompaction(region, + compactSplitThread.requestSystemCompaction(region, Thread.currentThread().getName()); } catch (IOException e) { e = e instanceof RemoteException ? @@ -630,6 +631,7 @@ class MemStoreFlusher implements FlushRequester { tracker.beforeExecution(); lock.readLock().lock(); + final CompactSplit compactSplitThread = server.getCompactSplitThread(); try { notifyFlushRequest(region, emergencyFlush); FlushResult flushResult = region.flushcache(families, false, tracker); @@ -637,9 +639,9 @@ class MemStoreFlusher implements FlushRequester { // We just want to check the size boolean shouldSplit = region.checkSplit().isPresent(); if (shouldSplit) { - this.server.compactSplitThread.requestSplit(region); + compactSplitThread.requestSplit(region); } else if (shouldCompact) { - server.compactSplitThread.requestSystemCompaction(region, Thread.currentThread().getName()); + compactSplitThread.requestSystemCompaction(region, Thread.currentThread().getName()); } } catch (DroppedSnapshotException ex) { // Cache flush can fail in a few places. If it fails in a critical diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java index 5a358bc0d8e..3da069dd833 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java @@ -235,37 +235,26 @@ class MetricsRegionServerWrapperImpl @Override public int getSplitQueueSize() { - if (this.regionServer.compactSplitThread == null) { - return 0; - } - return this.regionServer.compactSplitThread.getSplitQueueSize(); + final CompactSplit compactSplit = regionServer.getCompactSplitThread(); + return compactSplit == null ? 0 : compactSplit.getSplitQueueSize(); } @Override public int getCompactionQueueSize() { - //The thread could be zero. if so assume there is no queue. - if (this.regionServer.compactSplitThread == null) { - return 0; - } - return this.regionServer.compactSplitThread.getCompactionQueueSize(); + final CompactSplit compactSplit = regionServer.getCompactSplitThread(); + return compactSplit == null ? 0 : compactSplit.getCompactionQueueSize(); } @Override public int getSmallCompactionQueueSize() { - //The thread could be zero. if so assume there is no queue. - if (this.regionServer.compactSplitThread == null) { - return 0; - } - return this.regionServer.compactSplitThread.getSmallCompactionQueueSize(); + final CompactSplit compactSplit = regionServer.getCompactSplitThread(); + return compactSplit == null ? 0 : compactSplit.getSmallCompactionQueueSize(); } @Override public int getLargeCompactionQueueSize() { - //The thread could be zero. if so assume there is no queue. - if (this.regionServer.compactSplitThread == null) { - return 0; - } - return this.regionServer.compactSplitThread.getLargeCompactionQueueSize(); + final CompactSplit compactSplit = regionServer.getCompactSplitThread(); + return compactSplit == null ? 0 : compactSplit.getLargeCompactionQueueSize(); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSDumpServlet.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSDumpServlet.java index 11534674dcf..56b72e5661a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSDumpServlet.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSDumpServlet.java @@ -124,19 +124,20 @@ public class RSDumpServlet extends StateDumpServlet { } } - public static void dumpQueue(HRegionServer hrs, PrintWriter out) - throws IOException { - if (hrs.compactSplitThread != null) { + public static void dumpQueue(HRegionServer hrs, PrintWriter out) { + final CompactSplit compactSplit = hrs.getCompactSplitThread(); + if (compactSplit != null) { // 1. Print out Compaction/Split Queue - out.println("Compaction/Split Queue summary: " - + hrs.compactSplitThread.toString() ); - out.println(hrs.compactSplitThread.dumpQueue()); + out.println("Compaction/Split Queue summary: " + compactSplit); + out.println(compactSplit.dumpQueue()); } - if (hrs.getMemStoreFlusher() != null) { + final MemStoreFlusher memStoreFlusher = hrs.getMemStoreFlusher(); + if (memStoreFlusher != null) { // 2. Print out flush Queue - out.println("\nFlush Queue summary: " + hrs.getMemStoreFlusher().toString()); - out.println(hrs.getMemStoreFlusher().dumpQueue()); + out.println(); + out.println("Flush Queue summary: " + memStoreFlusher); + out.println(memStoreFlusher.dumpQueue()); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 9d9d5c7365a..79d77c7f296 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -1711,17 +1711,18 @@ public class RSRpcServices implements HBaseRPCErrorHandler, @Override public CompactionSwitchResponse compactionSwitch(RpcController controller, CompactionSwitchRequest request) throws ServiceException { + final CompactSplit compactSplitThread = regionServer.getCompactSplitThread(); try { checkOpen(); requestCount.increment(); - boolean prevState = regionServer.compactSplitThread.isCompactionsEnabled(); + boolean prevState = compactSplitThread.isCompactionsEnabled(); CompactionSwitchResponse response = CompactionSwitchResponse.newBuilder().setPrevState(prevState).build(); if (prevState == request.getEnabled()) { // passed in requested state is same as current state. No action required return response; } - regionServer.compactSplitThread.switchCompaction(request.getEnabled()); + compactSplitThread.switchCompaction(request.getEnabled()); return response; } catch (IOException ie) { throw new ServiceException(ie); @@ -1764,7 +1765,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } boolean compactionNeeded = flushResult.isCompactionNeeded(); if (compactionNeeded) { - regionServer.compactSplitThread.requestSystemCompaction(region, + regionServer.getCompactSplitThread().requestSystemCompaction(region, "Compaction through user triggered flush"); } builder.setFlushed(flushResult.isFlushSucceeded()); @@ -1880,6 +1881,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, ClearCompactionQueuesResponse.Builder respBuilder = ClearCompactionQueuesResponse.newBuilder(); requestCount.increment(); if (clearCompactionQueues.compareAndSet(false,true)) { + final CompactSplit compactSplitThread = regionServer.getCompactSplitThread(); try { checkOpen(); regionServer.getRegionServerCoprocessorHost().preClearCompactionQueues(); @@ -1887,10 +1889,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler, LOG.debug("clear " + queueName + " compaction queue"); switch (queueName) { case "long": - regionServer.compactSplitThread.clearLongCompactionsQueue(); + compactSplitThread.clearLongCompactionsQueue(); break; case "short": - regionServer.compactSplitThread.clearShortCompactionsQueue(); + compactSplitThread.clearShortCompactionsQueue(); break; default: LOG.warn("Unknown queue name " + queueName); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestChangingEncoding.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestChangingEncoding.java index c3f4e1c9f04..c6b8ddc5c59 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestChangingEncoding.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestChangingEncoding.java @@ -240,11 +240,11 @@ public class TestChangingEncoding { final long maxWaitime = System.currentTimeMillis() + 500; boolean cont; do { - cont = rs.compactSplitThread.getCompactionQueueSize() == 0; + cont = rs.getCompactSplitThread().getCompactionQueueSize() == 0; Threads.sleep(1); } while (cont && System.currentTimeMillis() < maxWaitime); - while (rs.compactSplitThread.getCompactionQueueSize() > 0) { + while (rs.getCompactSplitThread().getCompactionQueueSize() > 0) { Threads.sleep(1); } LOG.debug("Compaction queue size reached 0, continuing"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestLoadAndSwitchEncodeOnDisk.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestLoadAndSwitchEncodeOnDisk.java index 2d04fb35e56..7ef8052abcf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestLoadAndSwitchEncodeOnDisk.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestLoadAndSwitchEncodeOnDisk.java @@ -102,7 +102,7 @@ public class TestLoadAndSwitchEncodeOnDisk extends TestMiniClusterLoadSequential // Wait until compaction completes Threads.sleepWithoutInterrupt(5000); HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0); - while (rs.compactSplitThread.getCompactionQueueSize() > 0) { + while (rs.getCompactSplitThread().getCompactionQueueSize() > 0) { Threads.sleep(50); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java index f191e483681..06fbf8959e3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java @@ -120,39 +120,39 @@ public class TestCompactSplitThread { HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName); // check initial configuration of thread pool sizes - assertEquals(3, regionServer.compactSplitThread.getLargeCompactionThreadNum()); - assertEquals(4, regionServer.compactSplitThread.getSmallCompactionThreadNum()); - assertEquals(5, regionServer.compactSplitThread.getSplitThreadNum()); + assertEquals(3, regionServer.getCompactSplitThread().getLargeCompactionThreadNum()); + assertEquals(4, regionServer.getCompactSplitThread().getSmallCompactionThreadNum()); + assertEquals(5, regionServer.getCompactSplitThread().getSplitThreadNum()); // change bigger configurations and do online update conf.setInt(CompactSplit.LARGE_COMPACTION_THREADS, 4); conf.setInt(CompactSplit.SMALL_COMPACTION_THREADS, 5); conf.setInt(CompactSplit.SPLIT_THREADS, 6); try { - regionServer.compactSplitThread.onConfigurationChange(conf); + regionServer.getCompactSplitThread().onConfigurationChange(conf); } catch (IllegalArgumentException iae) { Assert.fail("Update bigger configuration failed!"); } // check again after online update - assertEquals(4, regionServer.compactSplitThread.getLargeCompactionThreadNum()); - assertEquals(5, regionServer.compactSplitThread.getSmallCompactionThreadNum()); - assertEquals(6, regionServer.compactSplitThread.getSplitThreadNum()); + assertEquals(4, regionServer.getCompactSplitThread().getLargeCompactionThreadNum()); + assertEquals(5, regionServer.getCompactSplitThread().getSmallCompactionThreadNum()); + assertEquals(6, regionServer.getCompactSplitThread().getSplitThreadNum()); // change smaller configurations and do online update conf.setInt(CompactSplit.LARGE_COMPACTION_THREADS, 2); conf.setInt(CompactSplit.SMALL_COMPACTION_THREADS, 3); conf.setInt(CompactSplit.SPLIT_THREADS, 4); try { - regionServer.compactSplitThread.onConfigurationChange(conf); + regionServer.getCompactSplitThread().onConfigurationChange(conf); } catch (IllegalArgumentException iae) { Assert.fail("Update smaller configuration failed!"); } // check again after online update - assertEquals(2, regionServer.compactSplitThread.getLargeCompactionThreadNum()); - assertEquals(3, regionServer.compactSplitThread.getSmallCompactionThreadNum()); - assertEquals(4, regionServer.compactSplitThread.getSplitThreadNum()); + assertEquals(2, regionServer.getCompactSplitThread().getLargeCompactionThreadNum()); + assertEquals(3, regionServer.getCompactSplitThread().getSmallCompactionThreadNum()); + assertEquals(4, regionServer.getCompactSplitThread().getSplitThreadNum()); } finally { conn.close(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerOnlineConfigChange.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerOnlineConfigChange.java index 79c043e69dc..b5c55be2166 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerOnlineConfigChange.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerOnlineConfigChange.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.io.IOException; @@ -102,26 +103,25 @@ public class TestRegionServerOnlineConfigChange { /** * Check if the number of compaction threads changes online - * @throws IOException */ @Test - public void testNumCompactionThreadsOnlineChange() throws IOException { - assertTrue(rs1.compactSplitThread != null); + public void testNumCompactionThreadsOnlineChange() { + assertNotNull(rs1.getCompactSplitThread()); int newNumSmallThreads = - rs1.compactSplitThread.getSmallCompactionThreadNum() + 1; + rs1.getCompactSplitThread().getSmallCompactionThreadNum() + 1; int newNumLargeThreads = - rs1.compactSplitThread.getLargeCompactionThreadNum() + 1; + rs1.getCompactSplitThread().getLargeCompactionThreadNum() + 1; conf.setInt("hbase.regionserver.thread.compaction.small", - newNumSmallThreads); + newNumSmallThreads); conf.setInt("hbase.regionserver.thread.compaction.large", - newNumLargeThreads); + newNumLargeThreads); rs1.getConfigurationManager().notifyAllObservers(conf); assertEquals(newNumSmallThreads, - rs1.compactSplitThread.getSmallCompactionThreadNum()); + rs1.getCompactSplitThread().getSmallCompactionThreadNum()); assertEquals(newNumLargeThreads, - rs1.compactSplitThread.getLargeCompactionThreadNum()); + rs1.getCompactSplitThread().getLargeCompactionThreadNum()); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestCompactionWithThroughputController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestCompactionWithThroughputController.java index 2345dc9482b..9198bd5d265 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestCompactionWithThroughputController.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestCompactionWithThroughputController.java @@ -208,7 +208,7 @@ public class TestCompactionWithThroughputController { TEST_UTIL.waitTableAvailable(tableName); HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName); PressureAwareCompactionThroughputController throughputController = - (PressureAwareCompactionThroughputController) regionServer.compactSplitThread + (PressureAwareCompactionThroughputController) regionServer.getCompactSplitThread() .getCompactionThroughputController(); assertEquals(10L * 1024 * 1024, throughputController.getMaxThroughput(), EPSILON); Table table = conn.getTable(tableName); @@ -234,9 +234,9 @@ public class TestCompactionWithThroughputController { conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, NoLimitThroughputController.class.getName()); - regionServer.compactSplitThread.onConfigurationChange(conf); + regionServer.getCompactSplitThread().onConfigurationChange(conf); assertTrue(throughputController.isStopped()); - assertTrue(regionServer.compactSplitThread.getCompactionThroughputController() + assertTrue(regionServer.getCompactSplitThread().getCompactionThroughputController() instanceof NoLimitThroughputController); } finally { conn.close();