diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java index 507bc499998..cc36b957c4c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java @@ -127,25 +127,14 @@ public class ExecutorService { return getExecutor(type).getThreadPoolExecutor(); } - public void startExecutorService(final ExecutorType type, final ExecutorConfig config) { - String name = type.getExecutorName(this.servername); - if (isExecutorServiceRunning(name)) { - LOG.debug("Executor service {} already running on {}", this, - this.servername); - return; - } - startExecutorService(config.setName(name)); - } - /** * Initialize the executor lazily, Note if an executor need to be initialized lazily, then all * paths should use this method to get the executor, should not start executor by using * {@link ExecutorService#startExecutorService(ExecutorConfig)} */ - public ThreadPoolExecutor getExecutorLazily(ExecutorType type, ExecutorConfig config) { - String name = type.getExecutorName(this.servername); - return executorMap.computeIfAbsent(name, (executorName) -> - new Executor(config.setName(name))).getThreadPoolExecutor(); + public ThreadPoolExecutor getExecutorLazily(ExecutorConfig config) { + return executorMap.computeIfAbsent(config.getName(), (executorName) -> + new Executor(config)).getThreadPoolExecutor(); } public void submit(final EventHandler eh) { @@ -184,7 +173,7 @@ public class ExecutorService { /** * Configuration wrapper for {@link Executor}. */ - public static class ExecutorConfig { + public class ExecutorConfig { // Refer to ThreadPoolExecutor javadoc for details of these configuration. // Argument validation and bound checks delegated to the underlying ThreadPoolExecutor // implementation. @@ -192,7 +181,16 @@ public class ExecutorService { private int corePoolSize = -1; private boolean allowCoreThreadTimeout = false; private long keepAliveTimeMillis = KEEP_ALIVE_TIME_MILLIS_DEFAULT; - private String name; + private ExecutorType executorType; + + public ExecutorConfig setExecutorType(ExecutorType type) { + this.executorType = type; + return this; + } + + private ExecutorType getExecutorType() { + return Preconditions.checkNotNull(executorType, "ExecutorType not set."); + } public int getCorePoolSize() { return corePoolSize; @@ -217,13 +215,11 @@ public class ExecutorService { return this; } + /** + * @return the executor name inferred from the type and the servername on which this is running. + */ public String getName() { - return Preconditions.checkNotNull(name); - } - - public ExecutorConfig setName(String name) { - this.name = name; - return this; + return getExecutorType().getExecutorName(servername); } public long getKeepAliveTimeMillis() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 74f199c7b71..900bcd81f12 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -1313,42 +1313,43 @@ public class HMaster extends HRegionServer implements MasterServices { // Start the executor service pools final int masterOpenRegionPoolSize = conf.getInt( HConstants.MASTER_OPEN_REGION_THREADS, HConstants.MASTER_OPEN_REGION_THREADS_DEFAULT); - this.executorService.startExecutorService(ExecutorType.MASTER_OPEN_REGION, - new ExecutorConfig().setCorePoolSize(masterOpenRegionPoolSize)); + executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType( + ExecutorType.MASTER_OPEN_REGION).setCorePoolSize(masterOpenRegionPoolSize)); final int masterCloseRegionPoolSize = conf.getInt( HConstants.MASTER_CLOSE_REGION_THREADS, HConstants.MASTER_CLOSE_REGION_THREADS_DEFAULT); - this.executorService.startExecutorService(ExecutorType.MASTER_CLOSE_REGION, - new ExecutorConfig().setCorePoolSize(masterCloseRegionPoolSize)); + executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType( + ExecutorType.MASTER_CLOSE_REGION).setCorePoolSize(masterCloseRegionPoolSize)); final int masterServerOpThreads = conf.getInt(HConstants.MASTER_SERVER_OPERATIONS_THREADS, HConstants.MASTER_SERVER_OPERATIONS_THREADS_DEFAULT); - this.executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS, - new ExecutorConfig().setCorePoolSize(masterServerOpThreads)); + executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType( + ExecutorType.MASTER_SERVER_OPERATIONS).setCorePoolSize(masterServerOpThreads)); final int masterServerMetaOpsThreads = conf.getInt( HConstants.MASTER_META_SERVER_OPERATIONS_THREADS, HConstants.MASTER_META_SERVER_OPERATIONS_THREADS_DEFAULT); - this.executorService.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS, - new ExecutorConfig().setCorePoolSize(masterServerMetaOpsThreads)); + executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType( + ExecutorType.MASTER_META_SERVER_OPERATIONS).setCorePoolSize(masterServerMetaOpsThreads)); final int masterLogReplayThreads = conf.getInt( HConstants.MASTER_LOG_REPLAY_OPS_THREADS, HConstants.MASTER_LOG_REPLAY_OPS_THREADS_DEFAULT); - this.executorService.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS, - new ExecutorConfig().setCorePoolSize(masterLogReplayThreads)); + executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType( + ExecutorType.M_LOG_REPLAY_OPS).setCorePoolSize(masterLogReplayThreads)); final int masterSnapshotThreads = conf.getInt( SnapshotManager.SNAPSHOT_POOL_THREADS_KEY, SnapshotManager.SNAPSHOT_POOL_THREADS_DEFAULT); - this.executorService.startExecutorService(ExecutorType.MASTER_SNAPSHOT_OPERATIONS, - new ExecutorConfig().setCorePoolSize(masterSnapshotThreads).setAllowCoreThreadTimeout(true)); + executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType( + ExecutorType.MASTER_SNAPSHOT_OPERATIONS).setCorePoolSize(masterSnapshotThreads) + .setAllowCoreThreadTimeout(true)); final int masterMergeDispatchThreads = conf.getInt(HConstants.MASTER_MERGE_DISPATCH_THREADS, HConstants.MASTER_MERGE_DISPATCH_THREADS_DEFAULT); - this.executorService.startExecutorService(ExecutorType.MASTER_MERGE_OPERATIONS, - new ExecutorConfig().setCorePoolSize(masterMergeDispatchThreads) - .setAllowCoreThreadTimeout(true)); + executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType( + ExecutorType.MASTER_MERGE_OPERATIONS).setCorePoolSize(masterMergeDispatchThreads) + .setAllowCoreThreadTimeout(true)); // We depend on there being only one instance of this executor running // at a time. To do concurrency, would need fencing of enable/disable of // tables. // Any time changing this maxThreads to > 1, pls see the comment at // AccessController#postCompletedCreateTableAction - this.executorService.startExecutorService( - ExecutorType.MASTER_TABLE_OPERATIONS, new ExecutorConfig().setCorePoolSize(1)); + executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType( + ExecutorType.MASTER_TABLE_OPERATIONS).setCorePoolSize(1)); startProcedureExecutor(); // Create cleaner thread pool diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index f2379ddd34d..1c76b249a47 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -2042,57 +2042,59 @@ public class HRegionServer extends Thread implements // Start executor services final int openRegionThreads = conf.getInt("hbase.regionserver.executor.openregion.threads", 3); - this.executorService.startExecutorService(ExecutorType.RS_OPEN_REGION, - new ExecutorConfig().setCorePoolSize(openRegionThreads)); + executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType( + ExecutorType.RS_OPEN_REGION).setCorePoolSize(openRegionThreads)); final int openMetaThreads = conf.getInt("hbase.regionserver.executor.openmeta.threads", 1); - this.executorService.startExecutorService(ExecutorType.RS_OPEN_META, - new ExecutorConfig().setCorePoolSize(openMetaThreads)); + executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType( + ExecutorType.RS_OPEN_META).setCorePoolSize(openMetaThreads)); final int openPriorityRegionThreads = conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3); - this.executorService.startExecutorService(ExecutorType.RS_OPEN_PRIORITY_REGION, - new ExecutorConfig().setCorePoolSize(openPriorityRegionThreads)); + executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType( + ExecutorType.RS_OPEN_PRIORITY_REGION).setCorePoolSize(openPriorityRegionThreads)); final int closeRegionThreads = conf.getInt("hbase.regionserver.executor.closeregion.threads", 3); - this.executorService.startExecutorService(ExecutorType.RS_CLOSE_REGION, - new ExecutorConfig().setCorePoolSize(closeRegionThreads)); + executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType( + ExecutorType.RS_CLOSE_REGION).setCorePoolSize(closeRegionThreads)); final int closeMetaThreads = conf.getInt("hbase.regionserver.executor.closemeta.threads", 1); - this.executorService.startExecutorService(ExecutorType.RS_CLOSE_META, - new ExecutorConfig().setCorePoolSize(closeMetaThreads)); + executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType( + ExecutorType.RS_CLOSE_META).setCorePoolSize(closeMetaThreads)); if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) { final int storeScannerParallelSeekThreads = conf.getInt("hbase.storescanner.parallel.seek.threads", 10); - this.executorService.startExecutorService(ExecutorType.RS_PARALLEL_SEEK, - new ExecutorConfig().setCorePoolSize(storeScannerParallelSeekThreads) - .setAllowCoreThreadTimeout(true)); + executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType( + ExecutorType.RS_PARALLEL_SEEK).setCorePoolSize(storeScannerParallelSeekThreads) + .setAllowCoreThreadTimeout(true)); } final int logReplayOpsThreads = conf.getInt( HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER); - this.executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, - new ExecutorConfig().setCorePoolSize(logReplayOpsThreads).setAllowCoreThreadTimeout(true)); + executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType( + ExecutorType.RS_LOG_REPLAY_OPS).setCorePoolSize(logReplayOpsThreads) + .setAllowCoreThreadTimeout(true)); // Start the threads for compacted files discharger final int compactionDischargerThreads = conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10); - this.executorService.startExecutorService(ExecutorType.RS_COMPACTED_FILES_DISCHARGER, - new ExecutorConfig().setCorePoolSize(compactionDischargerThreads)); + executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType( + ExecutorType.RS_COMPACTED_FILES_DISCHARGER).setCorePoolSize(compactionDischargerThreads)); if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) { final int regionReplicaFlushThreads = conf.getInt( "hbase.regionserver.region.replica.flusher.threads", conf.getInt( "hbase.regionserver.executor.openregion.threads", 3)); - this.executorService.startExecutorService(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS, - new ExecutorConfig().setCorePoolSize(regionReplicaFlushThreads)); + executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType( + ExecutorType.RS_REGION_REPLICA_FLUSH_OPS).setCorePoolSize(regionReplicaFlushThreads)); } final int refreshPeerThreads = conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2); - this.executorService.startExecutorService(ExecutorType.RS_REFRESH_PEER, - new ExecutorConfig().setCorePoolSize(refreshPeerThreads)); + executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType( + ExecutorType.RS_REFRESH_PEER).setCorePoolSize(refreshPeerThreads)); final int replaySyncReplicationWALThreads = conf.getInt("hbase.regionserver.executor.replay.sync.replication.wal.threads", 1); - this.executorService.startExecutorService(ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL, - new ExecutorConfig().setCorePoolSize(replaySyncReplicationWALThreads)); + executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType( + ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL).setCorePoolSize( + replaySyncReplicationWALThreads)); final int switchRpcThrottleThreads = conf.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 1); - this.executorService.startExecutorService(ExecutorType.RS_SWITCH_RPC_THROTTLE, - new ExecutorConfig().setCorePoolSize(switchRpcThrottleThreads)); + executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType( + ExecutorType.RS_SWITCH_RPC_THROTTLE).setCorePoolSize(switchRpcThrottleThreads)); Threads.setDaemonThreadRunning(this.walRoller, getName() + ".logRoller", uncaughtExceptionHandler); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java index fc6b371193a..eda04da62a9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java @@ -22,6 +22,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorConfig; import org.apache.hadoop.hbase.executor.ExecutorType; import org.apache.hadoop.hbase.io.ByteBuffAllocator; @@ -97,9 +98,10 @@ public class RegionServicesForStores { ThreadPoolExecutor getInMemoryCompactionPool() { if (rsServices != null) { - ExecutorConfig config = new ExecutorConfig().setCorePoolSize(inMemoryPoolSize); - return rsServices.getExecutorService().getExecutorLazily(ExecutorType.RS_IN_MEMORY_COMPACTION, - config); + ExecutorService executorService = rsServices.getExecutorService(); + ExecutorConfig config = executorService.new ExecutorConfig().setExecutorType( + ExecutorType.RS_IN_MEMORY_COMPACTION).setCorePoolSize(inMemoryPoolSize); + return executorService.getExecutorLazily(config); } else { // this could only happen in tests return getInMemoryCompactionPoolForTest(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestExecutorStatusChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestExecutorStatusChore.java index 96df8ee5646..4f645720b0e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestExecutorStatusChore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestExecutorStatusChore.java @@ -59,8 +59,8 @@ public class TestExecutorStatusChore { // Start an executor service pool with max 5 threads ExecutorService executorService = new ExecutorService("unit_test"); - executorService.startExecutorService(ExecutorType.RS_PARALLEL_SEEK, - new ExecutorConfig().setCorePoolSize(maxThreads)); + executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType( + ExecutorType.RS_PARALLEL_SEEK).setCorePoolSize(maxThreads)); MetricsRegionServerSource serverSource = CompatibilitySingletonFactory .getInstance(MetricsRegionServerSourceFactory.class).createServer(null); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/executor/TestExecutorService.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/executor/TestExecutorService.java index 1021d233a1b..6b58d073e03 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/executor/TestExecutorService.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/executor/TestExecutorService.java @@ -71,8 +71,8 @@ public class TestExecutorService { // Start an executor service pool with max 5 threads ExecutorService executorService = new ExecutorService("unit_test"); - executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS, - new ExecutorConfig().setCorePoolSize(maxThreads)); + executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType( + ExecutorType.MASTER_SERVER_OPERATIONS).setCorePoolSize(maxThreads)); Executor executor = executorService.getExecutor(ExecutorType.MASTER_SERVER_OPERATIONS); @@ -197,8 +197,8 @@ public class TestExecutorService { when(server.getConfiguration()).thenReturn(conf); ExecutorService executorService = new ExecutorService("unit_test"); - executorService.startExecutorService( - ExecutorType.MASTER_SERVER_OPERATIONS, new ExecutorConfig().setCorePoolSize(1)); + executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType( + ExecutorType.MASTER_SERVER_OPERATIONS).setCorePoolSize(1)); executorService.submit(new EventHandler(server, EventType.M_SERVER_SHUTDOWN) { @@ -230,8 +230,8 @@ public class TestExecutorService { when(server.getConfiguration()).thenReturn(conf); ExecutorService executorService = new ExecutorService("testSnapshotHandlers"); - executorService.startExecutorService( - ExecutorType.MASTER_SNAPSHOT_OPERATIONS, new ExecutorConfig().setCorePoolSize(1)); + executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType( + ExecutorType.MASTER_SNAPSHOT_OPERATIONS).setCorePoolSize(1)); CountDownLatch latch = new CountDownLatch(1); CountDownLatch waitForEventToStart = new CountDownLatch(1); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java index 8080a4f4ba3..742d7112572 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java @@ -63,6 +63,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorConfig; +import org.apache.hadoop.hbase.executor.ExecutorType; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl; @@ -193,7 +194,8 @@ public class TestHRegionReplayEvents { String string = org.apache.hadoop.hbase.executor.EventType.RS_COMPACTED_FILES_DISCHARGER .toString(); ExecutorService es = new ExecutorService(string); - es.startExecutorService(new ExecutorConfig().setCorePoolSize(1).setName(string + "-" + string)); + es.startExecutorService(es.new ExecutorConfig().setCorePoolSize(1).setExecutorType( + ExecutorType.RS_COMPACTED_FILES_DISCHARGER)); when(rss.getExecutorService()).thenReturn(es); primaryRegion = HRegion.createHRegion(primaryHri, rootDir, CONF, htd, walPrimary); primaryRegion.close(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java index cd950d6b7af..472f587d242 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java @@ -212,8 +212,8 @@ public class TestSplitLogWorker { SplitLogCounters.resetCounters(); executorService = new ExecutorService("TestSplitLogWorker"); - executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, - new ExecutorConfig().setCorePoolSize(10)); + executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType( + ExecutorType.RS_LOG_REPLAY_OPS).setCorePoolSize(10)); } @After