diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java index 342d44144ae..20177a2a7c1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java @@ -32,10 +32,10 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import org.apache.hadoop.hbase.monitoring.ThreadMonitoring; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.monitoring.ThreadMonitoring; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; @@ -126,13 +126,24 @@ public class ExecutorService { public void startExecutorService(final ExecutorType type, final int maxThreads) { String name = type.getExecutorName(this.servername); if (isExecutorServiceRunning(name)) { - LOG.debug("Executor service " + toString() + " already running on " + - this.servername); + LOG.debug("Executor service " + toString() + " already running on " + this.servername); return; } startExecutorService(name, maxThreads); } + /** + * Initialize the executor lazily, Note if an executor need to be initialized lazily, then all + * paths should use this method to get the executor, should not start executor by using + * {@link ExecutorService#startExecutorService(ExecutorType, int)} + */ + public ThreadPoolExecutor getExecutorLazily(ExecutorType type, int maxThreads) { + String name = type.getExecutorName(this.servername); + return executorMap + .computeIfAbsent(name, (executorName) -> new Executor(executorName, maxThreads)) + .getThreadPoolExecutor(); + } + public void submit(final EventHandler eh) { Executor executor = getExecutor(eh.getEventType().getExecutorServiceType()); if (executor == null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java index 7f130d1fc48..596385d0caa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java @@ -47,7 +47,9 @@ public enum ExecutorType { RS_REGION_REPLICA_FLUSH_OPS (28), RS_COMPACTED_FILES_DISCHARGER (29), RS_OPEN_PRIORITY_REGION (30), - RS_REFRESH_PEER (31); + RS_REFRESH_PEER(31), + RS_SWITCH_RPC_THROTTLE(33), + RS_IN_MEMORY_COMPACTION(34); ExecutorType(int value) { } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index 21659bbfbc8..c1176efa3fd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -63,6 +63,10 @@ public class CompactingMemStore extends AbstractMemStore { public static final String IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY = "hbase.memstore.inmemoryflush.threshold.factor"; private static final double IN_MEMORY_FLUSH_THRESHOLD_FACTOR_DEFAULT = 0.014; + // In-Memory compaction pool size + public static final String IN_MEMORY_CONPACTION_POOL_SIZE_KEY = + "hbase.regionserver.inmemory.compaction.pool.size"; + public static final int IN_MEMORY_CONPACTION_POOL_SIZE_DEFAULT = 10; private static final Logger LOG = LoggerFactory.getLogger(CompactingMemStore.class); private HStore store; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 03f7487c1fa..32e3d12eec3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -295,7 +295,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Track data size in all memstores private final MemStoreSizing memStoreSizing = new ThreadSafeMemStoreSizing(); - private final RegionServicesForStores regionServicesForStores = new RegionServicesForStores(this); + @VisibleForTesting + RegionServicesForStores regionServicesForStores; // Debug possible data loss due to WAL off final LongAdder numMutationsWithoutWAL = new LongAdder(); @@ -773,6 +774,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } this.rsServices = rsServices; + this.regionServicesForStores = new RegionServicesForStores(this, rsServices); setHTableSpecificConf(); this.scannerReadPoints = new ConcurrentHashMap<>(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java index c1af9db5bb9..68d46ea4c60 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java @@ -18,12 +18,10 @@ */ package org.apache.hadoop.hbase.regionserver; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.executor.ExecutorType; import org.apache.hadoop.hbase.wal.WAL; import org.apache.yetus.audience.InterfaceAudience; @@ -39,22 +37,18 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti @InterfaceAudience.Private public class RegionServicesForStores { - private static final int POOL_SIZE = 10; - private static final ThreadPoolExecutor INMEMORY_COMPACTION_POOL = - new ThreadPoolExecutor(POOL_SIZE, POOL_SIZE, 60, TimeUnit.SECONDS, - new LinkedBlockingQueue<>(), - new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - String name = Thread.currentThread().getName() + "-inmemoryCompactions-" + - System.currentTimeMillis(); - return new Thread(r, name); - } - }); private final HRegion region; + private final RegionServerServices rsServices; + private int inMemoryPoolSize; - public RegionServicesForStores(HRegion region) { + public RegionServicesForStores(HRegion region, RegionServerServices rsServices) { this.region = region; + this.rsServices = rsServices; + if (this.rsServices != null) { + this.inMemoryPoolSize = rsServices.getConfiguration().getInt( + CompactingMemStore.IN_MEMORY_CONPACTION_POOL_SIZE_KEY, + CompactingMemStore.IN_MEMORY_CONPACTION_POOL_SIZE_DEFAULT); + } } public void blockUpdates() { @@ -78,7 +72,14 @@ public class RegionServicesForStores { return region.getWAL(); } - public ThreadPoolExecutor getInMemoryCompactionPool() { return INMEMORY_COMPACTION_POOL; } + ThreadPoolExecutor getInMemoryCompactionPool() { + if (rsServices != null) { + return rsServices.getExecutorService().getExecutorLazily(ExecutorType.RS_IN_MEMORY_COMPACTION, + inMemoryPoolSize); + } else { + return null; + } + } public long getMemStoreFlushSize() { return region.getMemStoreFlushSize(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java index eef91d2e998..9afa052d659 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java @@ -25,6 +25,9 @@ import java.io.IOException; import java.lang.management.ManagementFactory; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; @@ -56,6 +59,7 @@ import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -111,8 +115,9 @@ public class TestCompactingMemStore extends TestDefaultMemStore { new HRegionInfo(TableName.valueOf("foobar"), null, null, false); WAL wal = hbaseUtility.createWal(conf, hbaseUtility.getDataTestDir(), info); this.region = HRegion.createHRegion(info, hbaseUtility.getDataTestDir(), conf, htd, wal, true); - //this.region = hbaseUtility.createTestRegion("foobar", hcd); - this.regionServicesForStores = region.getRegionServicesForStores(); + this.regionServicesForStores = Mockito.spy(region.getRegionServicesForStores()); + ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); + Mockito.when(regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool); this.store = new HStore(region, hcd, conf); long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage() diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java index 46ba4644094..9dbd3e01a7b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java @@ -44,6 +44,7 @@ import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -219,6 +220,9 @@ public class TestHStore { WALFactory wals = new WALFactory(walConf, methodName); region = new HRegion(new HRegionFileSystem(conf, fs, tableDir, info), wals.getWAL(info), conf, htd, null); + region.regionServicesForStores = Mockito.spy(region.regionServicesForStores); + ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); + Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool); } private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEditsReplayAndAbort.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEditsReplayAndAbort.java index b0cbd58f730..e55f3eea172 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEditsReplayAndAbort.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEditsReplayAndAbort.java @@ -122,6 +122,7 @@ public class TestRecoveredEditsReplayAndAbort { Mockito.when(rs.getNonceManager()).thenReturn(null); Mockito.when(rs.getServerName()).thenReturn(ServerName .valueOf("test", 0, 111)); + Mockito.when(rs.getConfiguration()).thenReturn(CONF); //create a region TableName testTable = TableName.valueOf("testRecoveredEidtsReplayAndAbort"); TableDescriptor htd = TableDescriptorBuilder.newBuilder(testTable) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java index 15bf2a441b4..edd8382b284 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java @@ -18,19 +18,17 @@ package org.apache.hadoop.hbase.regionserver; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.io.IOException; -import java.util.Arrays; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; @@ -40,6 +38,7 @@ import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.mockito.Mockito; /** * This test verifies the correctness of the Per Column Family flushing strategy @@ -67,14 +66,14 @@ public class TestWalAndCompactingMemStoreFlush { private Configuration conf; private HRegion initHRegion(String callingMethod, Configuration conf) throws IOException { - int i=0; + int i = 0; HTableDescriptor htd = new HTableDescriptor(TABLENAME); for (byte[] family : FAMILIES) { HColumnDescriptor hcd = new HColumnDescriptor(family); // even column families are going to have compacted memstore - if(i%2 == 0) { - hcd.setInMemoryCompaction(MemoryCompactionPolicy.valueOf( - conf.get(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY))); + if (i % 2 == 0) { + hcd.setInMemoryCompaction(MemoryCompactionPolicy + .valueOf(conf.get(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY))); } else { hcd.setInMemoryCompaction(MemoryCompactionPolicy.NONE); } @@ -84,7 +83,12 @@ public class TestWalAndCompactingMemStoreFlush { HRegionInfo info = new HRegionInfo(TABLENAME, null, null, false); Path path = new Path(DIR, callingMethod); - return HBaseTestingUtility.createRegionAndWAL(info, path, conf, htd); + HRegion region = HBaseTestingUtility.createRegionAndWAL(info, path, conf, htd, false); + region.regionServicesForStores = Mockito.spy(region.regionServicesForStores); + ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); + Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool); + region.initialize(null); + return region; } // A helper function to create puts. @@ -109,31 +113,12 @@ public class TestWalAndCompactingMemStoreFlush { return p; } - // A helper function to create gets. - private Get createGet(int familyNum, int putNum) { - byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum); - return new Get(row); - } - private void verifyInMemoryFlushSize(Region region) { assertEquals( ((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).getInmemoryFlushSize(), ((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore).getInmemoryFlushSize()); } - // A helper function to verify edits. - void verifyEdit(int familyNum, int putNum, Table table) throws IOException { - Result r = table.get(createGet(familyNum, putNum)); - byte[] family = FAMILIES[familyNum - 1]; - byte[] qf = Bytes.toBytes("q" + familyNum); - byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum); - assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum), r.getFamilyMap(family)); - assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum), - r.getFamilyMap(family).get(qf)); - assertTrue(("Incorrect value for Put#" + putNum + " for CF# " + familyNum), - Arrays.equals(r.getFamilyMap(family).get(qf), val)); - } - @Before public void setup() { conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java index 51827f84978..76633132f5c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java @@ -681,6 +681,7 @@ public abstract class AbstractTestWALReplay { RegionServerServices rsServices = Mockito.mock(RegionServerServices.class); Mockito.doReturn(false).when(rsServices).isAborted(); when(rsServices.getServerName()).thenReturn(ServerName.valueOf("foo", 10, 10)); + when(rsServices.getConfiguration()).thenReturn(conf); Configuration customConf = new Configuration(this.conf); customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY, CustomStoreFlusher.class.getName());