HBASE-21764 Size of in-memory compaction thread pool shoud be configurable

This commit is contained in:
huzheng 2019-01-30 16:15:02 +08:00
parent ab233de2f0
commit 0272839192
10 changed files with 69 additions and 53 deletions

View File

@ -32,10 +32,10 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hbase.monitoring.ThreadMonitoring;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.monitoring.ThreadMonitoring;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
@ -126,13 +126,24 @@ public class ExecutorService {
public void startExecutorService(final ExecutorType type, final int maxThreads) { public void startExecutorService(final ExecutorType type, final int maxThreads) {
String name = type.getExecutorName(this.servername); String name = type.getExecutorName(this.servername);
if (isExecutorServiceRunning(name)) { if (isExecutorServiceRunning(name)) {
LOG.debug("Executor service " + toString() + " already running on " + LOG.debug("Executor service " + toString() + " already running on " + this.servername);
this.servername);
return; return;
} }
startExecutorService(name, maxThreads); startExecutorService(name, maxThreads);
} }
/**
* Initialize the executor lazily, Note if an executor need to be initialized lazily, then all
* paths should use this method to get the executor, should not start executor by using
* {@link ExecutorService#startExecutorService(ExecutorType, int)}
*/
public ThreadPoolExecutor getExecutorLazily(ExecutorType type, int maxThreads) {
String name = type.getExecutorName(this.servername);
return executorMap
.computeIfAbsent(name, (executorName) -> new Executor(executorName, maxThreads))
.getThreadPoolExecutor();
}
public void submit(final EventHandler eh) { public void submit(final EventHandler eh) {
Executor executor = getExecutor(eh.getEventType().getExecutorServiceType()); Executor executor = getExecutor(eh.getEventType().getExecutorServiceType());
if (executor == null) { if (executor == null) {

View File

@ -47,7 +47,9 @@ public enum ExecutorType {
RS_REGION_REPLICA_FLUSH_OPS (28), RS_REGION_REPLICA_FLUSH_OPS (28),
RS_COMPACTED_FILES_DISCHARGER (29), RS_COMPACTED_FILES_DISCHARGER (29),
RS_OPEN_PRIORITY_REGION (30), RS_OPEN_PRIORITY_REGION (30),
RS_REFRESH_PEER (31); RS_REFRESH_PEER(31),
RS_SWITCH_RPC_THROTTLE(33),
RS_IN_MEMORY_COMPACTION(34);
ExecutorType(int value) { ExecutorType(int value) {
} }

View File

@ -63,6 +63,10 @@ public class CompactingMemStore extends AbstractMemStore {
public static final String IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY = public static final String IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY =
"hbase.memstore.inmemoryflush.threshold.factor"; "hbase.memstore.inmemoryflush.threshold.factor";
private static final double IN_MEMORY_FLUSH_THRESHOLD_FACTOR_DEFAULT = 0.014; private static final double IN_MEMORY_FLUSH_THRESHOLD_FACTOR_DEFAULT = 0.014;
// In-Memory compaction pool size
public static final String IN_MEMORY_CONPACTION_POOL_SIZE_KEY =
"hbase.regionserver.inmemory.compaction.pool.size";
public static final int IN_MEMORY_CONPACTION_POOL_SIZE_DEFAULT = 10;
private static final Logger LOG = LoggerFactory.getLogger(CompactingMemStore.class); private static final Logger LOG = LoggerFactory.getLogger(CompactingMemStore.class);
private HStore store; private HStore store;

View File

@ -295,7 +295,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// Track data size in all memstores // Track data size in all memstores
private final MemStoreSizing memStoreSizing = new ThreadSafeMemStoreSizing(); private final MemStoreSizing memStoreSizing = new ThreadSafeMemStoreSizing();
private final RegionServicesForStores regionServicesForStores = new RegionServicesForStores(this); @VisibleForTesting
RegionServicesForStores regionServicesForStores;
// Debug possible data loss due to WAL off // Debug possible data loss due to WAL off
final LongAdder numMutationsWithoutWAL = new LongAdder(); final LongAdder numMutationsWithoutWAL = new LongAdder();
@ -773,6 +774,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} }
} }
this.rsServices = rsServices; this.rsServices = rsServices;
this.regionServicesForStores = new RegionServicesForStores(this, rsServices);
setHTableSpecificConf(); setHTableSpecificConf();
this.scannerReadPoints = new ConcurrentHashMap<>(); this.scannerReadPoints = new ConcurrentHashMap<>();

View File

@ -18,12 +18,10 @@
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.executor.ExecutorType;
import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -39,22 +37,18 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
@InterfaceAudience.Private @InterfaceAudience.Private
public class RegionServicesForStores { public class RegionServicesForStores {
private static final int POOL_SIZE = 10;
private static final ThreadPoolExecutor INMEMORY_COMPACTION_POOL =
new ThreadPoolExecutor(POOL_SIZE, POOL_SIZE, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
String name = Thread.currentThread().getName() + "-inmemoryCompactions-" +
System.currentTimeMillis();
return new Thread(r, name);
}
});
private final HRegion region; private final HRegion region;
private final RegionServerServices rsServices;
private int inMemoryPoolSize;
public RegionServicesForStores(HRegion region) { public RegionServicesForStores(HRegion region, RegionServerServices rsServices) {
this.region = region; this.region = region;
this.rsServices = rsServices;
if (this.rsServices != null) {
this.inMemoryPoolSize = rsServices.getConfiguration().getInt(
CompactingMemStore.IN_MEMORY_CONPACTION_POOL_SIZE_KEY,
CompactingMemStore.IN_MEMORY_CONPACTION_POOL_SIZE_DEFAULT);
}
} }
public void blockUpdates() { public void blockUpdates() {
@ -78,7 +72,14 @@ public class RegionServicesForStores {
return region.getWAL(); return region.getWAL();
} }
public ThreadPoolExecutor getInMemoryCompactionPool() { return INMEMORY_COMPACTION_POOL; } ThreadPoolExecutor getInMemoryCompactionPool() {
if (rsServices != null) {
return rsServices.getExecutorService().getExecutorLazily(ExecutorType.RS_IN_MEMORY_COMPACTION,
inMemoryPoolSize);
} else {
return null;
}
}
public long getMemStoreFlushSize() { public long getMemStoreFlushSize() {
return region.getMemStoreFlushSize(); return region.getMemStoreFlushSize();

View File

@ -25,6 +25,9 @@ import java.io.IOException;
import java.lang.management.ManagementFactory; import java.lang.management.ManagementFactory;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellComparator;
@ -56,6 +59,7 @@ import org.junit.Before;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -111,8 +115,9 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
new HRegionInfo(TableName.valueOf("foobar"), null, null, false); new HRegionInfo(TableName.valueOf("foobar"), null, null, false);
WAL wal = hbaseUtility.createWal(conf, hbaseUtility.getDataTestDir(), info); WAL wal = hbaseUtility.createWal(conf, hbaseUtility.getDataTestDir(), info);
this.region = HRegion.createHRegion(info, hbaseUtility.getDataTestDir(), conf, htd, wal, true); this.region = HRegion.createHRegion(info, hbaseUtility.getDataTestDir(), conf, htd, wal, true);
//this.region = hbaseUtility.createTestRegion("foobar", hcd); this.regionServicesForStores = Mockito.spy(region.getRegionServicesForStores());
this.regionServicesForStores = region.getRegionServicesForStores(); ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
Mockito.when(regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool);
this.store = new HStore(region, hcd, conf); this.store = new HStore(region, hcd, conf);
long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage() long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage()

View File

@ -44,6 +44,7 @@ import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -219,6 +220,9 @@ public class TestHStore {
WALFactory wals = new WALFactory(walConf, methodName); WALFactory wals = new WALFactory(walConf, methodName);
region = new HRegion(new HRegionFileSystem(conf, fs, tableDir, info), wals.getWAL(info), conf, region = new HRegion(new HRegionFileSystem(conf, fs, tableDir, info), wals.getWAL(info), conf,
htd, null); htd, null);
region.regionServicesForStores = Mockito.spy(region.regionServicesForStores);
ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool);
} }
private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder, private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,

View File

@ -122,6 +122,7 @@ public class TestRecoveredEditsReplayAndAbort {
Mockito.when(rs.getNonceManager()).thenReturn(null); Mockito.when(rs.getNonceManager()).thenReturn(null);
Mockito.when(rs.getServerName()).thenReturn(ServerName Mockito.when(rs.getServerName()).thenReturn(ServerName
.valueOf("test", 0, 111)); .valueOf("test", 0, 111));
Mockito.when(rs.getConfiguration()).thenReturn(CONF);
//create a region //create a region
TableName testTable = TableName.valueOf("testRecoveredEidtsReplayAndAbort"); TableName testTable = TableName.valueOf("testRecoveredEidtsReplayAndAbort");
TableDescriptor htd = TableDescriptorBuilder.newBuilder(testTable) TableDescriptor htd = TableDescriptorBuilder.newBuilder(testTable)

View File

@ -18,19 +18,17 @@
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -40,6 +38,7 @@ import org.junit.Before;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
/** /**
* This test verifies the correctness of the Per Column Family flushing strategy * This test verifies the correctness of the Per Column Family flushing strategy
@ -67,14 +66,14 @@ public class TestWalAndCompactingMemStoreFlush {
private Configuration conf; private Configuration conf;
private HRegion initHRegion(String callingMethod, Configuration conf) throws IOException { private HRegion initHRegion(String callingMethod, Configuration conf) throws IOException {
int i=0; int i = 0;
HTableDescriptor htd = new HTableDescriptor(TABLENAME); HTableDescriptor htd = new HTableDescriptor(TABLENAME);
for (byte[] family : FAMILIES) { for (byte[] family : FAMILIES) {
HColumnDescriptor hcd = new HColumnDescriptor(family); HColumnDescriptor hcd = new HColumnDescriptor(family);
// even column families are going to have compacted memstore // even column families are going to have compacted memstore
if(i%2 == 0) { if (i % 2 == 0) {
hcd.setInMemoryCompaction(MemoryCompactionPolicy.valueOf( hcd.setInMemoryCompaction(MemoryCompactionPolicy
conf.get(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY))); .valueOf(conf.get(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY)));
} else { } else {
hcd.setInMemoryCompaction(MemoryCompactionPolicy.NONE); hcd.setInMemoryCompaction(MemoryCompactionPolicy.NONE);
} }
@ -84,7 +83,12 @@ public class TestWalAndCompactingMemStoreFlush {
HRegionInfo info = new HRegionInfo(TABLENAME, null, null, false); HRegionInfo info = new HRegionInfo(TABLENAME, null, null, false);
Path path = new Path(DIR, callingMethod); Path path = new Path(DIR, callingMethod);
return HBaseTestingUtility.createRegionAndWAL(info, path, conf, htd); HRegion region = HBaseTestingUtility.createRegionAndWAL(info, path, conf, htd, false);
region.regionServicesForStores = Mockito.spy(region.regionServicesForStores);
ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool);
region.initialize(null);
return region;
} }
// A helper function to create puts. // A helper function to create puts.
@ -109,31 +113,12 @@ public class TestWalAndCompactingMemStoreFlush {
return p; return p;
} }
// A helper function to create gets.
private Get createGet(int familyNum, int putNum) {
byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum);
return new Get(row);
}
private void verifyInMemoryFlushSize(Region region) { private void verifyInMemoryFlushSize(Region region) {
assertEquals( assertEquals(
((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).getInmemoryFlushSize(), ((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).getInmemoryFlushSize(),
((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore).getInmemoryFlushSize()); ((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore).getInmemoryFlushSize());
} }
// A helper function to verify edits.
void verifyEdit(int familyNum, int putNum, Table table) throws IOException {
Result r = table.get(createGet(familyNum, putNum));
byte[] family = FAMILIES[familyNum - 1];
byte[] qf = Bytes.toBytes("q" + familyNum);
byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum), r.getFamilyMap(family));
assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum),
r.getFamilyMap(family).get(qf));
assertTrue(("Incorrect value for Put#" + putNum + " for CF# " + familyNum),
Arrays.equals(r.getFamilyMap(family).get(qf), val));
}
@Before @Before
public void setup() { public void setup() {
conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());

View File

@ -681,6 +681,7 @@ public abstract class AbstractTestWALReplay {
RegionServerServices rsServices = Mockito.mock(RegionServerServices.class); RegionServerServices rsServices = Mockito.mock(RegionServerServices.class);
Mockito.doReturn(false).when(rsServices).isAborted(); Mockito.doReturn(false).when(rsServices).isAborted();
when(rsServices.getServerName()).thenReturn(ServerName.valueOf("foo", 10, 10)); when(rsServices.getServerName()).thenReturn(ServerName.valueOf("foo", 10, 10));
when(rsServices.getConfiguration()).thenReturn(conf);
Configuration customConf = new Configuration(this.conf); Configuration customConf = new Configuration(this.conf);
customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY, customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
CustomStoreFlusher.class.getName()); CustomStoreFlusher.class.getName());