HBASE-25547 (addendum): Roll ExecutorType into ExecutorConfig
This commit is contained in:
parent
53128fe7c1
commit
4c822d7463
|
@ -127,25 +127,14 @@ public class ExecutorService {
|
|||
return getExecutor(type).getThreadPoolExecutor();
|
||||
}
|
||||
|
||||
public void startExecutorService(final ExecutorType type, final ExecutorConfig config) {
|
||||
String name = type.getExecutorName(this.servername);
|
||||
if (isExecutorServiceRunning(name)) {
|
||||
LOG.debug("Executor service {} already running on {}", this,
|
||||
this.servername);
|
||||
return;
|
||||
}
|
||||
startExecutorService(config.setName(name));
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize the executor lazily, Note if an executor need to be initialized lazily, then all
|
||||
* paths should use this method to get the executor, should not start executor by using
|
||||
* {@link ExecutorService#startExecutorService(ExecutorConfig)}
|
||||
*/
|
||||
public ThreadPoolExecutor getExecutorLazily(ExecutorType type, ExecutorConfig config) {
|
||||
String name = type.getExecutorName(this.servername);
|
||||
return executorMap.computeIfAbsent(name, (executorName) ->
|
||||
new Executor(config.setName(name))).getThreadPoolExecutor();
|
||||
public ThreadPoolExecutor getExecutorLazily(ExecutorConfig config) {
|
||||
return executorMap.computeIfAbsent(config.getName(), (executorName) ->
|
||||
new Executor(config)).getThreadPoolExecutor();
|
||||
}
|
||||
|
||||
public void submit(final EventHandler eh) {
|
||||
|
@ -184,7 +173,7 @@ public class ExecutorService {
|
|||
/**
|
||||
* Configuration wrapper for {@link Executor}.
|
||||
*/
|
||||
public static class ExecutorConfig {
|
||||
public class ExecutorConfig {
|
||||
// Refer to ThreadPoolExecutor javadoc for details of these configuration.
|
||||
// Argument validation and bound checks delegated to the underlying ThreadPoolExecutor
|
||||
// implementation.
|
||||
|
@ -192,7 +181,16 @@ public class ExecutorService {
|
|||
private int corePoolSize = -1;
|
||||
private boolean allowCoreThreadTimeout = false;
|
||||
private long keepAliveTimeMillis = KEEP_ALIVE_TIME_MILLIS_DEFAULT;
|
||||
private String name;
|
||||
private ExecutorType executorType;
|
||||
|
||||
public ExecutorConfig setExecutorType(ExecutorType type) {
|
||||
this.executorType = type;
|
||||
return this;
|
||||
}
|
||||
|
||||
private ExecutorType getExecutorType() {
|
||||
return Preconditions.checkNotNull(executorType, "ExecutorType not set.");
|
||||
}
|
||||
|
||||
public int getCorePoolSize() {
|
||||
return corePoolSize;
|
||||
|
@ -217,13 +215,11 @@ public class ExecutorService {
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the executor name inferred from the type and the servername on which this is running.
|
||||
*/
|
||||
public String getName() {
|
||||
return Preconditions.checkNotNull(name);
|
||||
}
|
||||
|
||||
public ExecutorConfig setName(String name) {
|
||||
this.name = name;
|
||||
return this;
|
||||
return getExecutorType().getExecutorName(servername);
|
||||
}
|
||||
|
||||
public long getKeepAliveTimeMillis() {
|
||||
|
|
|
@ -1313,42 +1313,43 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
// Start the executor service pools
|
||||
final int masterOpenRegionPoolSize = conf.getInt(
|
||||
HConstants.MASTER_OPEN_REGION_THREADS, HConstants.MASTER_OPEN_REGION_THREADS_DEFAULT);
|
||||
this.executorService.startExecutorService(ExecutorType.MASTER_OPEN_REGION,
|
||||
new ExecutorConfig().setCorePoolSize(masterOpenRegionPoolSize));
|
||||
executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
|
||||
ExecutorType.MASTER_OPEN_REGION).setCorePoolSize(masterOpenRegionPoolSize));
|
||||
final int masterCloseRegionPoolSize = conf.getInt(
|
||||
HConstants.MASTER_CLOSE_REGION_THREADS, HConstants.MASTER_CLOSE_REGION_THREADS_DEFAULT);
|
||||
this.executorService.startExecutorService(ExecutorType.MASTER_CLOSE_REGION,
|
||||
new ExecutorConfig().setCorePoolSize(masterCloseRegionPoolSize));
|
||||
executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
|
||||
ExecutorType.MASTER_CLOSE_REGION).setCorePoolSize(masterCloseRegionPoolSize));
|
||||
final int masterServerOpThreads = conf.getInt(HConstants.MASTER_SERVER_OPERATIONS_THREADS,
|
||||
HConstants.MASTER_SERVER_OPERATIONS_THREADS_DEFAULT);
|
||||
this.executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
|
||||
new ExecutorConfig().setCorePoolSize(masterServerOpThreads));
|
||||
executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
|
||||
ExecutorType.MASTER_SERVER_OPERATIONS).setCorePoolSize(masterServerOpThreads));
|
||||
final int masterServerMetaOpsThreads = conf.getInt(
|
||||
HConstants.MASTER_META_SERVER_OPERATIONS_THREADS,
|
||||
HConstants.MASTER_META_SERVER_OPERATIONS_THREADS_DEFAULT);
|
||||
this.executorService.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS,
|
||||
new ExecutorConfig().setCorePoolSize(masterServerMetaOpsThreads));
|
||||
executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
|
||||
ExecutorType.MASTER_META_SERVER_OPERATIONS).setCorePoolSize(masterServerMetaOpsThreads));
|
||||
final int masterLogReplayThreads = conf.getInt(
|
||||
HConstants.MASTER_LOG_REPLAY_OPS_THREADS, HConstants.MASTER_LOG_REPLAY_OPS_THREADS_DEFAULT);
|
||||
this.executorService.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS,
|
||||
new ExecutorConfig().setCorePoolSize(masterLogReplayThreads));
|
||||
executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
|
||||
ExecutorType.M_LOG_REPLAY_OPS).setCorePoolSize(masterLogReplayThreads));
|
||||
final int masterSnapshotThreads = conf.getInt(
|
||||
SnapshotManager.SNAPSHOT_POOL_THREADS_KEY, SnapshotManager.SNAPSHOT_POOL_THREADS_DEFAULT);
|
||||
this.executorService.startExecutorService(ExecutorType.MASTER_SNAPSHOT_OPERATIONS,
|
||||
new ExecutorConfig().setCorePoolSize(masterSnapshotThreads).setAllowCoreThreadTimeout(true));
|
||||
executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
|
||||
ExecutorType.MASTER_SNAPSHOT_OPERATIONS).setCorePoolSize(masterSnapshotThreads)
|
||||
.setAllowCoreThreadTimeout(true));
|
||||
final int masterMergeDispatchThreads = conf.getInt(HConstants.MASTER_MERGE_DISPATCH_THREADS,
|
||||
HConstants.MASTER_MERGE_DISPATCH_THREADS_DEFAULT);
|
||||
this.executorService.startExecutorService(ExecutorType.MASTER_MERGE_OPERATIONS,
|
||||
new ExecutorConfig().setCorePoolSize(masterMergeDispatchThreads)
|
||||
.setAllowCoreThreadTimeout(true));
|
||||
executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
|
||||
ExecutorType.MASTER_MERGE_OPERATIONS).setCorePoolSize(masterMergeDispatchThreads)
|
||||
.setAllowCoreThreadTimeout(true));
|
||||
|
||||
// We depend on there being only one instance of this executor running
|
||||
// at a time. To do concurrency, would need fencing of enable/disable of
|
||||
// tables.
|
||||
// Any time changing this maxThreads to > 1, pls see the comment at
|
||||
// AccessController#postCompletedCreateTableAction
|
||||
this.executorService.startExecutorService(
|
||||
ExecutorType.MASTER_TABLE_OPERATIONS, new ExecutorConfig().setCorePoolSize(1));
|
||||
executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
|
||||
ExecutorType.MASTER_TABLE_OPERATIONS).setCorePoolSize(1));
|
||||
startProcedureExecutor();
|
||||
|
||||
// Create cleaner thread pool
|
||||
|
|
|
@ -2042,57 +2042,59 @@ public class HRegionServer extends Thread implements
|
|||
|
||||
// Start executor services
|
||||
final int openRegionThreads = conf.getInt("hbase.regionserver.executor.openregion.threads", 3);
|
||||
this.executorService.startExecutorService(ExecutorType.RS_OPEN_REGION,
|
||||
new ExecutorConfig().setCorePoolSize(openRegionThreads));
|
||||
executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
|
||||
ExecutorType.RS_OPEN_REGION).setCorePoolSize(openRegionThreads));
|
||||
final int openMetaThreads = conf.getInt("hbase.regionserver.executor.openmeta.threads", 1);
|
||||
this.executorService.startExecutorService(ExecutorType.RS_OPEN_META,
|
||||
new ExecutorConfig().setCorePoolSize(openMetaThreads));
|
||||
executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
|
||||
ExecutorType.RS_OPEN_META).setCorePoolSize(openMetaThreads));
|
||||
final int openPriorityRegionThreads =
|
||||
conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3);
|
||||
this.executorService.startExecutorService(ExecutorType.RS_OPEN_PRIORITY_REGION,
|
||||
new ExecutorConfig().setCorePoolSize(openPriorityRegionThreads));
|
||||
executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
|
||||
ExecutorType.RS_OPEN_PRIORITY_REGION).setCorePoolSize(openPriorityRegionThreads));
|
||||
final int closeRegionThreads =
|
||||
conf.getInt("hbase.regionserver.executor.closeregion.threads", 3);
|
||||
this.executorService.startExecutorService(ExecutorType.RS_CLOSE_REGION,
|
||||
new ExecutorConfig().setCorePoolSize(closeRegionThreads));
|
||||
executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
|
||||
ExecutorType.RS_CLOSE_REGION).setCorePoolSize(closeRegionThreads));
|
||||
final int closeMetaThreads = conf.getInt("hbase.regionserver.executor.closemeta.threads", 1);
|
||||
this.executorService.startExecutorService(ExecutorType.RS_CLOSE_META,
|
||||
new ExecutorConfig().setCorePoolSize(closeMetaThreads));
|
||||
executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
|
||||
ExecutorType.RS_CLOSE_META).setCorePoolSize(closeMetaThreads));
|
||||
if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) {
|
||||
final int storeScannerParallelSeekThreads =
|
||||
conf.getInt("hbase.storescanner.parallel.seek.threads", 10);
|
||||
this.executorService.startExecutorService(ExecutorType.RS_PARALLEL_SEEK,
|
||||
new ExecutorConfig().setCorePoolSize(storeScannerParallelSeekThreads)
|
||||
.setAllowCoreThreadTimeout(true));
|
||||
executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
|
||||
ExecutorType.RS_PARALLEL_SEEK).setCorePoolSize(storeScannerParallelSeekThreads)
|
||||
.setAllowCoreThreadTimeout(true));
|
||||
}
|
||||
final int logReplayOpsThreads = conf.getInt(
|
||||
HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER);
|
||||
this.executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS,
|
||||
new ExecutorConfig().setCorePoolSize(logReplayOpsThreads).setAllowCoreThreadTimeout(true));
|
||||
executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
|
||||
ExecutorType.RS_LOG_REPLAY_OPS).setCorePoolSize(logReplayOpsThreads)
|
||||
.setAllowCoreThreadTimeout(true));
|
||||
// Start the threads for compacted files discharger
|
||||
final int compactionDischargerThreads =
|
||||
conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10);
|
||||
this.executorService.startExecutorService(ExecutorType.RS_COMPACTED_FILES_DISCHARGER,
|
||||
new ExecutorConfig().setCorePoolSize(compactionDischargerThreads));
|
||||
executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
|
||||
ExecutorType.RS_COMPACTED_FILES_DISCHARGER).setCorePoolSize(compactionDischargerThreads));
|
||||
if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) {
|
||||
final int regionReplicaFlushThreads = conf.getInt(
|
||||
"hbase.regionserver.region.replica.flusher.threads", conf.getInt(
|
||||
"hbase.regionserver.executor.openregion.threads", 3));
|
||||
this.executorService.startExecutorService(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS,
|
||||
new ExecutorConfig().setCorePoolSize(regionReplicaFlushThreads));
|
||||
executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
|
||||
ExecutorType.RS_REGION_REPLICA_FLUSH_OPS).setCorePoolSize(regionReplicaFlushThreads));
|
||||
}
|
||||
final int refreshPeerThreads =
|
||||
conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2);
|
||||
this.executorService.startExecutorService(ExecutorType.RS_REFRESH_PEER,
|
||||
new ExecutorConfig().setCorePoolSize(refreshPeerThreads));
|
||||
executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
|
||||
ExecutorType.RS_REFRESH_PEER).setCorePoolSize(refreshPeerThreads));
|
||||
final int replaySyncReplicationWALThreads =
|
||||
conf.getInt("hbase.regionserver.executor.replay.sync.replication.wal.threads", 1);
|
||||
this.executorService.startExecutorService(ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL,
|
||||
new ExecutorConfig().setCorePoolSize(replaySyncReplicationWALThreads));
|
||||
executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
|
||||
ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL).setCorePoolSize(
|
||||
replaySyncReplicationWALThreads));
|
||||
final int switchRpcThrottleThreads =
|
||||
conf.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 1);
|
||||
this.executorService.startExecutorService(ExecutorType.RS_SWITCH_RPC_THROTTLE,
|
||||
new ExecutorConfig().setCorePoolSize(switchRpcThrottleThreads));
|
||||
executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
|
||||
ExecutorType.RS_SWITCH_RPC_THROTTLE).setCorePoolSize(switchRpcThrottleThreads));
|
||||
|
||||
Threads.setDaemonThreadRunning(this.walRoller, getName() + ".logRoller",
|
||||
uncaughtExceptionHandler);
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.util.concurrent.LinkedBlockingQueue;
|
|||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.executor.ExecutorService;
|
||||
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorConfig;
|
||||
import org.apache.hadoop.hbase.executor.ExecutorType;
|
||||
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
||||
|
@ -97,9 +98,10 @@ public class RegionServicesForStores {
|
|||
|
||||
ThreadPoolExecutor getInMemoryCompactionPool() {
|
||||
if (rsServices != null) {
|
||||
ExecutorConfig config = new ExecutorConfig().setCorePoolSize(inMemoryPoolSize);
|
||||
return rsServices.getExecutorService().getExecutorLazily(ExecutorType.RS_IN_MEMORY_COMPACTION,
|
||||
config);
|
||||
ExecutorService executorService = rsServices.getExecutorService();
|
||||
ExecutorConfig config = executorService.new ExecutorConfig().setExecutorType(
|
||||
ExecutorType.RS_IN_MEMORY_COMPACTION).setCorePoolSize(inMemoryPoolSize);
|
||||
return executorService.getExecutorLazily(config);
|
||||
} else {
|
||||
// this could only happen in tests
|
||||
return getInMemoryCompactionPoolForTest();
|
||||
|
|
|
@ -59,8 +59,8 @@ public class TestExecutorStatusChore {
|
|||
|
||||
// Start an executor service pool with max 5 threads
|
||||
ExecutorService executorService = new ExecutorService("unit_test");
|
||||
executorService.startExecutorService(ExecutorType.RS_PARALLEL_SEEK,
|
||||
new ExecutorConfig().setCorePoolSize(maxThreads));
|
||||
executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
|
||||
ExecutorType.RS_PARALLEL_SEEK).setCorePoolSize(maxThreads));
|
||||
|
||||
MetricsRegionServerSource serverSource = CompatibilitySingletonFactory
|
||||
.getInstance(MetricsRegionServerSourceFactory.class).createServer(null);
|
||||
|
|
|
@ -71,8 +71,8 @@ public class TestExecutorService {
|
|||
|
||||
// Start an executor service pool with max 5 threads
|
||||
ExecutorService executorService = new ExecutorService("unit_test");
|
||||
executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
|
||||
new ExecutorConfig().setCorePoolSize(maxThreads));
|
||||
executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
|
||||
ExecutorType.MASTER_SERVER_OPERATIONS).setCorePoolSize(maxThreads));
|
||||
|
||||
Executor executor =
|
||||
executorService.getExecutor(ExecutorType.MASTER_SERVER_OPERATIONS);
|
||||
|
@ -197,8 +197,8 @@ public class TestExecutorService {
|
|||
when(server.getConfiguration()).thenReturn(conf);
|
||||
|
||||
ExecutorService executorService = new ExecutorService("unit_test");
|
||||
executorService.startExecutorService(
|
||||
ExecutorType.MASTER_SERVER_OPERATIONS, new ExecutorConfig().setCorePoolSize(1));
|
||||
executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
|
||||
ExecutorType.MASTER_SERVER_OPERATIONS).setCorePoolSize(1));
|
||||
|
||||
|
||||
executorService.submit(new EventHandler(server, EventType.M_SERVER_SHUTDOWN) {
|
||||
|
@ -230,8 +230,8 @@ public class TestExecutorService {
|
|||
when(server.getConfiguration()).thenReturn(conf);
|
||||
|
||||
ExecutorService executorService = new ExecutorService("testSnapshotHandlers");
|
||||
executorService.startExecutorService(
|
||||
ExecutorType.MASTER_SNAPSHOT_OPERATIONS, new ExecutorConfig().setCorePoolSize(1));
|
||||
executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
|
||||
ExecutorType.MASTER_SNAPSHOT_OPERATIONS).setCorePoolSize(1));
|
||||
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
CountDownLatch waitForEventToStart = new CountDownLatch(1);
|
||||
|
|
|
@ -63,6 +63,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
|
|||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.executor.ExecutorService;
|
||||
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorConfig;
|
||||
import org.apache.hadoop.hbase.executor.ExecutorType;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl;
|
||||
|
@ -193,7 +194,8 @@ public class TestHRegionReplayEvents {
|
|||
String string = org.apache.hadoop.hbase.executor.EventType.RS_COMPACTED_FILES_DISCHARGER
|
||||
.toString();
|
||||
ExecutorService es = new ExecutorService(string);
|
||||
es.startExecutorService(new ExecutorConfig().setCorePoolSize(1).setName(string + "-" + string));
|
||||
es.startExecutorService(es.new ExecutorConfig().setCorePoolSize(1).setExecutorType(
|
||||
ExecutorType.RS_COMPACTED_FILES_DISCHARGER));
|
||||
when(rss.getExecutorService()).thenReturn(es);
|
||||
primaryRegion = HRegion.createHRegion(primaryHri, rootDir, CONF, htd, walPrimary);
|
||||
primaryRegion.close();
|
||||
|
|
|
@ -212,8 +212,8 @@ public class TestSplitLogWorker {
|
|||
|
||||
SplitLogCounters.resetCounters();
|
||||
executorService = new ExecutorService("TestSplitLogWorker");
|
||||
executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS,
|
||||
new ExecutorConfig().setCorePoolSize(10));
|
||||
executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
|
||||
ExecutorType.RS_LOG_REPLAY_OPS).setCorePoolSize(10));
|
||||
}
|
||||
|
||||
@After
|
||||
|
|
Loading…
Reference in New Issue