From 81ebe6d40d485a3aee4451acc340cf6db62609b7 Mon Sep 17 00:00:00 2001 From: huzheng Date: Wed, 30 Jan 2019 16:15:02 +0800 Subject: [PATCH] HBASE-21764 Size of in-memory compaction thread pool shoud be configurable --- .../hbase/executor/ExecutorService.java | 16 ++++++- .../hadoop/hbase/executor/ExecutorType.java | 3 +- .../regionserver/CompactingMemStore.java | 4 ++ .../hadoop/hbase/regionserver/HRegion.java | 4 +- .../regionserver/RegionServicesForStores.java | 35 +++++++-------- .../regionserver/TestCompactingMemStore.java | 9 +++- .../hadoop/hbase/regionserver/TestHStore.java | 4 ++ .../TestRecoveredEditsReplayAndAbort.java | 1 + .../TestWalAndCompactingMemStoreFlush.java | 43 ++++++------------- .../wal/AbstractTestWALReplay.java | 1 + 10 files changed, 68 insertions(+), 52 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java index 4f8909e62b8..71d8ea5fa02 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java @@ -32,6 +32,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; + import org.apache.hadoop.hbase.monitoring.ThreadMonitoring; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -133,13 +134,24 @@ public class ExecutorService { public void startExecutorService(final ExecutorType type, final int maxThreads) { String name = type.getExecutorName(this.servername); if (isExecutorServiceRunning(name)) { - LOG.debug("Executor service " + toString() + " already running on " + - this.servername); + LOG.debug("Executor service " + toString() + " already running on " + this.servername); return; } startExecutorService(name, maxThreads); } + /** + * Initialize the executor lazily, Note if an executor need to be initialized lazily, then all + * paths should use this method to get the executor, should not start executor by using + * {@link ExecutorService#startExecutorService(ExecutorType, int)} + */ + public ThreadPoolExecutor getExecutorLazily(ExecutorType type, int maxThreads) { + String name = type.getExecutorName(this.servername); + return executorMap + .computeIfAbsent(name, (executorName) -> new Executor(executorName, maxThreads)) + .getThreadPoolExecutor(); + } + public void submit(final EventHandler eh) { Executor executor = getExecutor(eh.getEventType().getExecutorServiceType()); if (executor == null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java index 819f3691ba9..d354d62c901 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java @@ -49,7 +49,8 @@ public enum ExecutorType { RS_OPEN_PRIORITY_REGION (30), RS_REFRESH_PEER(31), RS_REPLAY_SYNC_REPLICATION_WAL(32), - RS_SWITCH_RPC_THROTTLE(33); + RS_SWITCH_RPC_THROTTLE(33), + RS_IN_MEMORY_COMPACTION(34); ExecutorType(int value) { } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index 834d9c1d373..64c2c53ec06 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -63,6 +63,10 @@ public class CompactingMemStore extends AbstractMemStore { public static final String IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY = "hbase.memstore.inmemoryflush.threshold.factor"; private static final int IN_MEMORY_FLUSH_MULTIPLIER = 1; + // In-Memory compaction pool size + public static final String IN_MEMORY_CONPACTION_POOL_SIZE_KEY = + "hbase.regionserver.inmemory.compaction.pool.size"; + public static final int IN_MEMORY_CONPACTION_POOL_SIZE_DEFAULT = 10; private static final Logger LOG = LoggerFactory.getLogger(CompactingMemStore.class); private HStore store; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 6b36fa98617..c95f61d25a6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -298,7 +298,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Track data size in all memstores private final MemStoreSizing memStoreSizing = new ThreadSafeMemStoreSizing(); - private final RegionServicesForStores regionServicesForStores = new RegionServicesForStores(this); + @VisibleForTesting + RegionServicesForStores regionServicesForStores; // Debug possible data loss due to WAL off final LongAdder numMutationsWithoutWAL = new LongAdder(); @@ -784,6 +785,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.blockCache = rsServices.getBlockCache().orElse(null); this.mobFileCache = rsServices.getMobFileCache().orElse(null); } + this.regionServicesForStores = new RegionServicesForStores(this, rsServices); setHTableSpecificConf(); this.scannerReadPoints = new ConcurrentHashMap<>(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java index 91b23b36ece..ae9977e1ee6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java @@ -18,12 +18,10 @@ */ package org.apache.hadoop.hbase.regionserver; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.executor.ExecutorType; import org.apache.hadoop.hbase.wal.WAL; import org.apache.yetus.audience.InterfaceAudience; @@ -39,22 +37,18 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti @InterfaceAudience.Private public class RegionServicesForStores { - private static final int POOL_SIZE = 10; - private static final ThreadPoolExecutor INMEMORY_COMPACTION_POOL = - new ThreadPoolExecutor(POOL_SIZE, POOL_SIZE, 60, TimeUnit.SECONDS, - new LinkedBlockingQueue<>(), - new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - String name = Thread.currentThread().getName() + "-inmemoryCompactions-" + - System.currentTimeMillis(); - return new Thread(r, name); - } - }); private final HRegion region; + private final RegionServerServices rsServices; + private int inMemoryPoolSize; - public RegionServicesForStores(HRegion region) { + public RegionServicesForStores(HRegion region, RegionServerServices rsServices) { this.region = region; + this.rsServices = rsServices; + if (this.rsServices != null) { + this.inMemoryPoolSize = rsServices.getConfiguration().getInt( + CompactingMemStore.IN_MEMORY_CONPACTION_POOL_SIZE_KEY, + CompactingMemStore.IN_MEMORY_CONPACTION_POOL_SIZE_DEFAULT); + } } public void addMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta, @@ -70,7 +64,14 @@ public class RegionServicesForStores { return region.getWAL(); } - public ThreadPoolExecutor getInMemoryCompactionPool() { return INMEMORY_COMPACTION_POOL; } + ThreadPoolExecutor getInMemoryCompactionPool() { + if (rsServices != null) { + return rsServices.getExecutorService().getExecutorLazily(ExecutorType.RS_IN_MEMORY_COMPACTION, + inMemoryPoolSize); + } else { + return null; + } + } public long getMemStoreFlushSize() { return region.getMemStoreFlushSize(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java index 3d5a8ff4a6f..501d5cde2ea 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java @@ -25,6 +25,9 @@ import java.io.IOException; import java.lang.management.ManagementFactory; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; @@ -56,6 +59,7 @@ import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -111,8 +115,9 @@ public class TestCompactingMemStore extends TestDefaultMemStore { new HRegionInfo(TableName.valueOf("foobar"), null, null, false); WAL wal = hbaseUtility.createWal(conf, hbaseUtility.getDataTestDir(), info); this.region = HRegion.createHRegion(info, hbaseUtility.getDataTestDir(), conf, htd, wal, true); - //this.region = hbaseUtility.createTestRegion("foobar", hcd); - this.regionServicesForStores = region.getRegionServicesForStores(); + this.regionServicesForStores = Mockito.spy(region.getRegionServicesForStores()); + ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); + Mockito.when(regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool); this.store = new HStore(region, hcd, conf); long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage() diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java index 74337696fa9..786334ebac3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java @@ -44,6 +44,7 @@ import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -220,6 +221,9 @@ public class TestHStore { WALFactory wals = new WALFactory(walConf, methodName); region = new HRegion(new HRegionFileSystem(conf, fs, tableDir, info), wals.getWAL(info), conf, htd, null); + region.regionServicesForStores = Mockito.spy(region.regionServicesForStores); + ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); + Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool); } private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEditsReplayAndAbort.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEditsReplayAndAbort.java index 7aeff84a825..17118a8030f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEditsReplayAndAbort.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEditsReplayAndAbort.java @@ -122,6 +122,7 @@ public class TestRecoveredEditsReplayAndAbort { Mockito.when(rs.getNonceManager()).thenReturn(null); Mockito.when(rs.getServerName()).thenReturn(ServerName .valueOf("test", 0, 111)); + Mockito.when(rs.getConfiguration()).thenReturn(CONF); //create a region TableName testTable = TableName.valueOf("testRecoveredEidtsReplayAndAbort"); TableDescriptor htd = TableDescriptorBuilder.newBuilder(testTable) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java index 15bf2a441b4..edd8382b284 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java @@ -18,19 +18,17 @@ package org.apache.hadoop.hbase.regionserver; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.io.IOException; -import java.util.Arrays; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; @@ -40,6 +38,7 @@ import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.mockito.Mockito; /** * This test verifies the correctness of the Per Column Family flushing strategy @@ -67,14 +66,14 @@ public class TestWalAndCompactingMemStoreFlush { private Configuration conf; private HRegion initHRegion(String callingMethod, Configuration conf) throws IOException { - int i=0; + int i = 0; HTableDescriptor htd = new HTableDescriptor(TABLENAME); for (byte[] family : FAMILIES) { HColumnDescriptor hcd = new HColumnDescriptor(family); // even column families are going to have compacted memstore - if(i%2 == 0) { - hcd.setInMemoryCompaction(MemoryCompactionPolicy.valueOf( - conf.get(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY))); + if (i % 2 == 0) { + hcd.setInMemoryCompaction(MemoryCompactionPolicy + .valueOf(conf.get(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY))); } else { hcd.setInMemoryCompaction(MemoryCompactionPolicy.NONE); } @@ -84,7 +83,12 @@ public class TestWalAndCompactingMemStoreFlush { HRegionInfo info = new HRegionInfo(TABLENAME, null, null, false); Path path = new Path(DIR, callingMethod); - return HBaseTestingUtility.createRegionAndWAL(info, path, conf, htd); + HRegion region = HBaseTestingUtility.createRegionAndWAL(info, path, conf, htd, false); + region.regionServicesForStores = Mockito.spy(region.regionServicesForStores); + ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); + Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool); + region.initialize(null); + return region; } // A helper function to create puts. @@ -109,31 +113,12 @@ public class TestWalAndCompactingMemStoreFlush { return p; } - // A helper function to create gets. - private Get createGet(int familyNum, int putNum) { - byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum); - return new Get(row); - } - private void verifyInMemoryFlushSize(Region region) { assertEquals( ((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).getInmemoryFlushSize(), ((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore).getInmemoryFlushSize()); } - // A helper function to verify edits. - void verifyEdit(int familyNum, int putNum, Table table) throws IOException { - Result r = table.get(createGet(familyNum, putNum)); - byte[] family = FAMILIES[familyNum - 1]; - byte[] qf = Bytes.toBytes("q" + familyNum); - byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum); - assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum), r.getFamilyMap(family)); - assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum), - r.getFamilyMap(family).get(qf)); - assertTrue(("Incorrect value for Put#" + putNum + " for CF# " + familyNum), - Arrays.equals(r.getFamilyMap(family).get(qf), val)); - } - @Before public void setup() { conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java index 3f9040be74a..f2fd5916f77 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java @@ -681,6 +681,7 @@ public abstract class AbstractTestWALReplay { RegionServerServices rsServices = Mockito.mock(RegionServerServices.class); Mockito.doReturn(false).when(rsServices).isAborted(); when(rsServices.getServerName()).thenReturn(ServerName.valueOf("foo", 10, 10)); + when(rsServices.getConfiguration()).thenReturn(conf); Configuration customConf = new Configuration(this.conf); customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY, CustomStoreFlusher.class.getName());