HBASE-25547: Thread pools should release unused resources (#2922)

Plumbs the configuration needed to enable core thread timeout on non-critical thread pools.
Currently only enabled for thread pools with op-codes RS_LOG_REPLAY_OPS, RS_PARALLEL_SEEK, MASTER_SNAPSHOT_OPERATIONS, MASTER_MERGE_OPERATIONS. Others can be added later as
needed.

Signed-off-by: Michael Stack <stack@apache.org>
Signed-off-by: Viraj Jasani <vjasani@apache.org>
Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
This commit is contained in:
Bharath Vissapragada 2021-02-16 11:12:00 -08:00 committed by GitHub
parent b6649a8784
commit 618236dd90
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 162 additions and 63 deletions

View File

@ -37,6 +37,7 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps; import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ListenableFuture; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ListenableFuture;
@ -50,11 +51,10 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
* and a <code>Runnable</code> that handles the object that is added to the queue. * and a <code>Runnable</code> that handles the object that is added to the queue.
* *
* <p>In order to create a new service, create an instance of this class and * <p>In order to create a new service, create an instance of this class and
* then do: <code>instance.startExecutorService("myService");</code>. When done * then do: <code>instance.startExecutorService(executorConfig);</code>. {@link ExecutorConfig}
* call {@link #shutdown()}. * wraps the configuration needed by this service. When done call {@link #shutdown()}.
* *
* <p>In order to use the service created above, call * <p>In order to use the service created above, call {@link #submit(EventHandler)}.
* {@link #submit(EventHandler)}.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class ExecutorService { public class ExecutorService {
@ -81,15 +81,16 @@ public class ExecutorService {
/** /**
* Start an executor service with a given name. If there was a service already * Start an executor service with a given name. If there was a service already
* started with the same name, this throws a RuntimeException. * started with the same name, this throws a RuntimeException.
* @param name Name of the service to start. * @param config Configuration to use for the executor.
*/ */
public void startExecutorService(String name, int maxThreads) { public void startExecutorService(final ExecutorConfig config) {
final String name = config.getName();
Executor hbes = this.executorMap.compute(name, (key, value) -> { Executor hbes = this.executorMap.compute(name, (key, value) -> {
if (value != null) { if (value != null) {
throw new RuntimeException("An executor service with the name " + key + throw new RuntimeException("An executor service with the name " + key +
" is already running!"); " is already running!");
} }
return new Executor(key, maxThreads); return new Executor(config);
}); });
LOG.debug( LOG.debug(
@ -119,34 +120,32 @@ public class ExecutorService {
} }
Executor getExecutor(String name) { Executor getExecutor(String name) {
Executor executor = this.executorMap.get(name); return this.executorMap.get(name);
return executor;
} }
public ThreadPoolExecutor getExecutorThreadPool(final ExecutorType type) { public ThreadPoolExecutor getExecutorThreadPool(final ExecutorType type) {
return getExecutor(type).getThreadPoolExecutor(); return getExecutor(type).getThreadPoolExecutor();
} }
public void startExecutorService(final ExecutorType type, final int maxThreads) { public void startExecutorService(final ExecutorType type, final ExecutorConfig config) {
String name = type.getExecutorName(this.servername); String name = type.getExecutorName(this.servername);
if (isExecutorServiceRunning(name)) { if (isExecutorServiceRunning(name)) {
LOG.debug("Executor service {} already running on {}", this, LOG.debug("Executor service {} already running on {}", this,
this.servername); this.servername);
return; return;
} }
startExecutorService(name, maxThreads); startExecutorService(config.setName(name));
} }
/** /**
* Initialize the executor lazily, Note if an executor need to be initialized lazily, then all * Initialize the executor lazily, Note if an executor need to be initialized lazily, then all
* paths should use this method to get the executor, should not start executor by using * paths should use this method to get the executor, should not start executor by using
* {@link ExecutorService#startExecutorService(ExecutorType, int)} * {@link ExecutorService#startExecutorService(ExecutorConfig)}
*/ */
public ThreadPoolExecutor getExecutorLazily(ExecutorType type, int maxThreads) { public ThreadPoolExecutor getExecutorLazily(ExecutorType type, ExecutorConfig config) {
String name = type.getExecutorName(this.servername); String name = type.getExecutorName(this.servername);
return executorMap return executorMap.computeIfAbsent(name, (executorName) ->
.computeIfAbsent(name, (executorName) -> new Executor(executorName, maxThreads)) new Executor(config.setName(name))).getThreadPoolExecutor();
.getThreadPoolExecutor();
} }
public void submit(final EventHandler eh) { public void submit(final EventHandler eh) {
@ -182,12 +181,65 @@ public class ExecutorService {
return ret; return ret;
} }
/**
* Configuration wrapper for {@link Executor}.
*/
public static class ExecutorConfig {
// Refer to ThreadPoolExecutor javadoc for details of these configuration.
// Argument validation and bound checks delegated to the underlying ThreadPoolExecutor
// implementation.
public static final long KEEP_ALIVE_TIME_MILLIS_DEFAULT = 1000;
private int corePoolSize = -1;
private boolean allowCoreThreadTimeout = false;
private long keepAliveTimeMillis = KEEP_ALIVE_TIME_MILLIS_DEFAULT;
private String name;
public int getCorePoolSize() {
return corePoolSize;
}
public ExecutorConfig setCorePoolSize(int corePoolSize) {
this.corePoolSize = corePoolSize;
return this;
}
public boolean allowCoreThreadTimeout() {
return allowCoreThreadTimeout;
}
/**
* Allows timing out of core threads. Good to set this for non-critical thread pools for
* release of unused resources. Refer to {@link ThreadPoolExecutor#allowCoreThreadTimeOut}
* for additional details.
*/
public ExecutorConfig setAllowCoreThreadTimeout(boolean allowCoreThreadTimeout) {
this.allowCoreThreadTimeout = allowCoreThreadTimeout;
return this;
}
public String getName() {
return Preconditions.checkNotNull(name);
}
public ExecutorConfig setName(String name) {
this.name = name;
return this;
}
public long getKeepAliveTimeMillis() {
return keepAliveTimeMillis;
}
public ExecutorConfig setKeepAliveTimeMillis(long keepAliveTimeMillis) {
this.keepAliveTimeMillis = keepAliveTimeMillis;
return this;
}
}
/** /**
* Executor instance. * Executor instance.
*/ */
static class Executor { static class Executor {
// how long to retain excess threads
static final long keepAliveTimeInMillis = 1000;
// the thread pool executor that services the requests // the thread pool executor that services the requests
final TrackingThreadPoolExecutor threadPoolExecutor; final TrackingThreadPoolExecutor threadPoolExecutor;
// work queue to use - unbounded queue // work queue to use - unbounded queue
@ -196,13 +248,15 @@ public class ExecutorService {
private static final AtomicLong seqids = new AtomicLong(0); private static final AtomicLong seqids = new AtomicLong(0);
private final long id; private final long id;
protected Executor(String name, int maxThreads) { protected Executor(ExecutorConfig config) {
this.id = seqids.incrementAndGet(); this.id = seqids.incrementAndGet();
this.name = name; this.name = config.getName();
// create the thread pool executor // create the thread pool executor
this.threadPoolExecutor = new TrackingThreadPoolExecutor( this.threadPoolExecutor = new TrackingThreadPoolExecutor(
maxThreads, maxThreads, // setting maxPoolSize > corePoolSize has no effect since we use an unbounded task queue.
keepAliveTimeInMillis, TimeUnit.MILLISECONDS, q); config.getCorePoolSize(), config.getCorePoolSize(),
config.getKeepAliveTimeMillis(), TimeUnit.MILLISECONDS, q);
this.threadPoolExecutor.allowCoreThreadTimeOut(config.allowCoreThreadTimeout());
// name the threads for this threadpool // name the threads for this threadpool
ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
tfb.setNameFormat(this.name + "-%d"); tfb.setNameFormat(this.name + "-%d");

View File

@ -88,6 +88,7 @@ import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.exceptions.MasterStoppedException; import org.apache.hadoop.hbase.exceptions.MasterStoppedException;
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorConfig;
import org.apache.hadoop.hbase.executor.ExecutorType; import org.apache.hadoop.hbase.executor.ExecutorType;
import org.apache.hadoop.hbase.favored.FavoredNodesManager; import org.apache.hadoop.hbase.favored.FavoredNodesManager;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
@ -1310,30 +1311,44 @@ public class HMaster extends HRegionServer implements MasterServices {
*/ */
private void startServiceThreads() throws IOException { private void startServiceThreads() throws IOException {
// Start the executor service pools // Start the executor service pools
this.executorService.startExecutorService(ExecutorType.MASTER_OPEN_REGION, conf.getInt( final int masterOpenRegionPoolSize = conf.getInt(
HConstants.MASTER_OPEN_REGION_THREADS, HConstants.MASTER_OPEN_REGION_THREADS_DEFAULT)); HConstants.MASTER_OPEN_REGION_THREADS, HConstants.MASTER_OPEN_REGION_THREADS_DEFAULT);
this.executorService.startExecutorService(ExecutorType.MASTER_CLOSE_REGION, conf.getInt( this.executorService.startExecutorService(ExecutorType.MASTER_OPEN_REGION,
HConstants.MASTER_CLOSE_REGION_THREADS, HConstants.MASTER_CLOSE_REGION_THREADS_DEFAULT)); new ExecutorConfig().setCorePoolSize(masterOpenRegionPoolSize));
final int masterCloseRegionPoolSize = conf.getInt(
HConstants.MASTER_CLOSE_REGION_THREADS, HConstants.MASTER_CLOSE_REGION_THREADS_DEFAULT);
this.executorService.startExecutorService(ExecutorType.MASTER_CLOSE_REGION,
new ExecutorConfig().setCorePoolSize(masterCloseRegionPoolSize));
final int masterServerOpThreads = conf.getInt(HConstants.MASTER_SERVER_OPERATIONS_THREADS,
HConstants.MASTER_SERVER_OPERATIONS_THREADS_DEFAULT);
this.executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS, this.executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
conf.getInt(HConstants.MASTER_SERVER_OPERATIONS_THREADS, new ExecutorConfig().setCorePoolSize(masterServerOpThreads));
HConstants.MASTER_SERVER_OPERATIONS_THREADS_DEFAULT)); final int masterServerMetaOpsThreads = conf.getInt(
HConstants.MASTER_META_SERVER_OPERATIONS_THREADS,
HConstants.MASTER_META_SERVER_OPERATIONS_THREADS_DEFAULT);
this.executorService.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS, this.executorService.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS,
conf.getInt(HConstants.MASTER_META_SERVER_OPERATIONS_THREADS, new ExecutorConfig().setCorePoolSize(masterServerMetaOpsThreads));
HConstants.MASTER_META_SERVER_OPERATIONS_THREADS_DEFAULT)); final int masterLogReplayThreads = conf.getInt(
this.executorService.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS, conf.getInt( HConstants.MASTER_LOG_REPLAY_OPS_THREADS, HConstants.MASTER_LOG_REPLAY_OPS_THREADS_DEFAULT);
HConstants.MASTER_LOG_REPLAY_OPS_THREADS, HConstants.MASTER_LOG_REPLAY_OPS_THREADS_DEFAULT)); this.executorService.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS,
this.executorService.startExecutorService(ExecutorType.MASTER_SNAPSHOT_OPERATIONS, conf.getInt( new ExecutorConfig().setCorePoolSize(masterLogReplayThreads));
SnapshotManager.SNAPSHOT_POOL_THREADS_KEY, SnapshotManager.SNAPSHOT_POOL_THREADS_DEFAULT)); final int masterSnapshotThreads = conf.getInt(
this.executorService.startExecutorService(ExecutorType.MASTER_MERGE_OPERATIONS, conf.getInt( SnapshotManager.SNAPSHOT_POOL_THREADS_KEY, SnapshotManager.SNAPSHOT_POOL_THREADS_DEFAULT);
HConstants.MASTER_MERGE_DISPATCH_THREADS, this.executorService.startExecutorService(ExecutorType.MASTER_SNAPSHOT_OPERATIONS,
HConstants.MASTER_MERGE_DISPATCH_THREADS_DEFAULT)); new ExecutorConfig().setCorePoolSize(masterSnapshotThreads).setAllowCoreThreadTimeout(true));
final int masterMergeDispatchThreads = conf.getInt(HConstants.MASTER_MERGE_DISPATCH_THREADS,
HConstants.MASTER_MERGE_DISPATCH_THREADS_DEFAULT);
this.executorService.startExecutorService(ExecutorType.MASTER_MERGE_OPERATIONS,
new ExecutorConfig().setCorePoolSize(masterMergeDispatchThreads)
.setAllowCoreThreadTimeout(true));
// We depend on there being only one instance of this executor running // We depend on there being only one instance of this executor running
// at a time. To do concurrency, would need fencing of enable/disable of // at a time. To do concurrency, would need fencing of enable/disable of
// tables. // tables.
// Any time changing this maxThreads to > 1, pls see the comment at // Any time changing this maxThreads to > 1, pls see the comment at
// AccessController#postCompletedCreateTableAction // AccessController#postCompletedCreateTableAction
this.executorService.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1); this.executorService.startExecutorService(
ExecutorType.MASTER_TABLE_OPERATIONS, new ExecutorConfig().setCorePoolSize(1));
startProcedureExecutor(); startProcedureExecutor();
// Create cleaner thread pool // Create cleaner thread pool

View File

@ -102,6 +102,7 @@ import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.exceptions.RegionOpeningException; import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorConfig;
import org.apache.hadoop.hbase.executor.ExecutorType; import org.apache.hadoop.hbase.executor.ExecutorType;
import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.http.InfoServer; import org.apache.hadoop.hbase.http.InfoServer;
@ -2040,36 +2041,58 @@ public class HRegionServer extends Thread implements
choreService.scheduleChore(compactedFileDischarger); choreService.scheduleChore(compactedFileDischarger);
// Start executor services // Start executor services
final int openRegionThreads = conf.getInt("hbase.regionserver.executor.openregion.threads", 3);
this.executorService.startExecutorService(ExecutorType.RS_OPEN_REGION, this.executorService.startExecutorService(ExecutorType.RS_OPEN_REGION,
conf.getInt("hbase.regionserver.executor.openregion.threads", 3)); new ExecutorConfig().setCorePoolSize(openRegionThreads));
final int openMetaThreads = conf.getInt("hbase.regionserver.executor.openmeta.threads", 1);
this.executorService.startExecutorService(ExecutorType.RS_OPEN_META, this.executorService.startExecutorService(ExecutorType.RS_OPEN_META,
conf.getInt("hbase.regionserver.executor.openmeta.threads", 1)); new ExecutorConfig().setCorePoolSize(openMetaThreads));
final int openPriorityRegionThreads =
conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3);
this.executorService.startExecutorService(ExecutorType.RS_OPEN_PRIORITY_REGION, this.executorService.startExecutorService(ExecutorType.RS_OPEN_PRIORITY_REGION,
conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3)); new ExecutorConfig().setCorePoolSize(openPriorityRegionThreads));
final int closeRegionThreads =
conf.getInt("hbase.regionserver.executor.closeregion.threads", 3);
this.executorService.startExecutorService(ExecutorType.RS_CLOSE_REGION, this.executorService.startExecutorService(ExecutorType.RS_CLOSE_REGION,
conf.getInt("hbase.regionserver.executor.closeregion.threads", 3)); new ExecutorConfig().setCorePoolSize(closeRegionThreads));
final int closeMetaThreads = conf.getInt("hbase.regionserver.executor.closemeta.threads", 1);
this.executorService.startExecutorService(ExecutorType.RS_CLOSE_META, this.executorService.startExecutorService(ExecutorType.RS_CLOSE_META,
conf.getInt("hbase.regionserver.executor.closemeta.threads", 1)); new ExecutorConfig().setCorePoolSize(closeMetaThreads));
if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) { if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) {
final int storeScannerParallelSeekThreads =
conf.getInt("hbase.storescanner.parallel.seek.threads", 10);
this.executorService.startExecutorService(ExecutorType.RS_PARALLEL_SEEK, this.executorService.startExecutorService(ExecutorType.RS_PARALLEL_SEEK,
conf.getInt("hbase.storescanner.parallel.seek.threads", 10)); new ExecutorConfig().setCorePoolSize(storeScannerParallelSeekThreads)
.setAllowCoreThreadTimeout(true));
} }
this.executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt( final int logReplayOpsThreads = conf.getInt(
HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER)); HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER);
this.executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS,
new ExecutorConfig().setCorePoolSize(logReplayOpsThreads).setAllowCoreThreadTimeout(true));
// Start the threads for compacted files discharger // Start the threads for compacted files discharger
final int compactionDischargerThreads =
conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10);
this.executorService.startExecutorService(ExecutorType.RS_COMPACTED_FILES_DISCHARGER, this.executorService.startExecutorService(ExecutorType.RS_COMPACTED_FILES_DISCHARGER,
conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10)); new ExecutorConfig().setCorePoolSize(compactionDischargerThreads));
if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) { if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) {
final int regionReplicaFlushThreads = conf.getInt(
"hbase.regionserver.region.replica.flusher.threads", conf.getInt(
"hbase.regionserver.executor.openregion.threads", 3));
this.executorService.startExecutorService(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS, this.executorService.startExecutorService(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS,
conf.getInt("hbase.regionserver.region.replica.flusher.threads", new ExecutorConfig().setCorePoolSize(regionReplicaFlushThreads));
conf.getInt("hbase.regionserver.executor.openregion.threads", 3)));
} }
final int refreshPeerThreads =
conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2);
this.executorService.startExecutorService(ExecutorType.RS_REFRESH_PEER, this.executorService.startExecutorService(ExecutorType.RS_REFRESH_PEER,
conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2)); new ExecutorConfig().setCorePoolSize(refreshPeerThreads));
final int replaySyncReplicationWALThreads =
conf.getInt("hbase.regionserver.executor.replay.sync.replication.wal.threads", 1);
this.executorService.startExecutorService(ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL, this.executorService.startExecutorService(ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL,
conf.getInt("hbase.regionserver.executor.replay.sync.replication.wal.threads", 1)); new ExecutorConfig().setCorePoolSize(replaySyncReplicationWALThreads));
final int switchRpcThrottleThreads =
conf.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 1);
this.executorService.startExecutorService(ExecutorType.RS_SWITCH_RPC_THROTTLE, this.executorService.startExecutorService(ExecutorType.RS_SWITCH_RPC_THROTTLE,
conf.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 1)); new ExecutorConfig().setCorePoolSize(switchRpcThrottleThreads));
Threads.setDaemonThreadRunning(this.walRoller, getName() + ".logRoller", Threads.setDaemonThreadRunning(this.walRoller, getName() + ".logRoller",
uncaughtExceptionHandler); uncaughtExceptionHandler);

View File

@ -22,6 +22,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorConfig;
import org.apache.hadoop.hbase.executor.ExecutorType; import org.apache.hadoop.hbase.executor.ExecutorType;
import org.apache.hadoop.hbase.io.ByteBuffAllocator; import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL;
@ -96,8 +97,9 @@ public class RegionServicesForStores {
ThreadPoolExecutor getInMemoryCompactionPool() { ThreadPoolExecutor getInMemoryCompactionPool() {
if (rsServices != null) { if (rsServices != null) {
ExecutorConfig config = new ExecutorConfig().setCorePoolSize(inMemoryPoolSize);
return rsServices.getExecutorService().getExecutorLazily(ExecutorType.RS_IN_MEMORY_COMPACTION, return rsServices.getExecutorService().getExecutorLazily(ExecutorType.RS_IN_MEMORY_COMPACTION,
inMemoryPoolSize); config);
} else { } else {
// this could only happen in tests // this could only happen in tests
return getInMemoryCompactionPoolForTest(); return getInMemoryCompactionPoolForTest();

View File

@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorConfig;
import org.apache.hadoop.hbase.executor.ExecutorType; import org.apache.hadoop.hbase.executor.ExecutorType;
import org.apache.hadoop.hbase.executor.TestExecutorService.TestEventHandler; import org.apache.hadoop.hbase.executor.TestExecutorService.TestEventHandler;
import org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource; import org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource;
@ -58,8 +59,8 @@ public class TestExecutorStatusChore {
// Start an executor service pool with max 5 threads // Start an executor service pool with max 5 threads
ExecutorService executorService = new ExecutorService("unit_test"); ExecutorService executorService = new ExecutorService("unit_test");
executorService.startExecutorService( executorService.startExecutorService(ExecutorType.RS_PARALLEL_SEEK,
ExecutorType.RS_PARALLEL_SEEK, maxThreads); new ExecutorConfig().setCorePoolSize(maxThreads));
MetricsRegionServerSource serverSource = CompatibilitySingletonFactory MetricsRegionServerSource serverSource = CompatibilitySingletonFactory
.getInstance(MetricsRegionServerSourceFactory.class).createServer(null); .getInstance(MetricsRegionServerSourceFactory.class).createServer(null);

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.Waiter.Predicate; import org.apache.hadoop.hbase.Waiter.Predicate;
import org.apache.hadoop.hbase.executor.ExecutorService.Executor; import org.apache.hadoop.hbase.executor.ExecutorService.Executor;
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorConfig;
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorStatus; import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorStatus;
import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
@ -70,8 +71,8 @@ public class TestExecutorService {
// Start an executor service pool with max 5 threads // Start an executor service pool with max 5 threads
ExecutorService executorService = new ExecutorService("unit_test"); ExecutorService executorService = new ExecutorService("unit_test");
executorService.startExecutorService( executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
ExecutorType.MASTER_SERVER_OPERATIONS, maxThreads); new ExecutorConfig().setCorePoolSize(maxThreads));
Executor executor = Executor executor =
executorService.getExecutor(ExecutorType.MASTER_SERVER_OPERATIONS); executorService.getExecutor(ExecutorType.MASTER_SERVER_OPERATIONS);
@ -138,7 +139,7 @@ public class TestExecutorService {
} }
// Make sure threads are still around even after their timetolive expires. // Make sure threads are still around even after their timetolive expires.
Thread.sleep(ExecutorService.Executor.keepAliveTimeInMillis * 2); Thread.sleep(ExecutorConfig.KEEP_ALIVE_TIME_MILLIS_DEFAULT * 2);
assertEquals(maxThreads, pool.getPoolSize()); assertEquals(maxThreads, pool.getPoolSize());
executorService.shutdown(); executorService.shutdown();
@ -197,7 +198,7 @@ public class TestExecutorService {
ExecutorService executorService = new ExecutorService("unit_test"); ExecutorService executorService = new ExecutorService("unit_test");
executorService.startExecutorService( executorService.startExecutorService(
ExecutorType.MASTER_SERVER_OPERATIONS, 1); ExecutorType.MASTER_SERVER_OPERATIONS, new ExecutorConfig().setCorePoolSize(1));
executorService.submit(new EventHandler(server, EventType.M_SERVER_SHUTDOWN) { executorService.submit(new EventHandler(server, EventType.M_SERVER_SHUTDOWN) {
@ -229,7 +230,8 @@ public class TestExecutorService {
when(server.getConfiguration()).thenReturn(conf); when(server.getConfiguration()).thenReturn(conf);
ExecutorService executorService = new ExecutorService("testSnapshotHandlers"); ExecutorService executorService = new ExecutorService("testSnapshotHandlers");
executorService.startExecutorService(ExecutorType.MASTER_SNAPSHOT_OPERATIONS, 1); executorService.startExecutorService(
ExecutorType.MASTER_SNAPSHOT_OPERATIONS, new ExecutorConfig().setCorePoolSize(1));
CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1);
CountDownLatch waitForEventToStart = new CountDownLatch(1); CountDownLatch waitForEventToStart = new CountDownLatch(1);

View File

@ -62,6 +62,7 @@ import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorConfig;
import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl; import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl;
@ -192,8 +193,7 @@ public class TestHRegionReplayEvents {
String string = org.apache.hadoop.hbase.executor.EventType.RS_COMPACTED_FILES_DISCHARGER String string = org.apache.hadoop.hbase.executor.EventType.RS_COMPACTED_FILES_DISCHARGER
.toString(); .toString();
ExecutorService es = new ExecutorService(string); ExecutorService es = new ExecutorService(string);
es.startExecutorService( es.startExecutorService(new ExecutorConfig().setCorePoolSize(1).setName(string + "-" + string));
string+"-"+string, 1);
when(rss.getExecutorService()).thenReturn(es); when(rss.getExecutorService()).thenReturn(es);
primaryRegion = HRegion.createHRegion(primaryHri, rootDir, CONF, htd, walPrimary); primaryRegion = HRegion.createHRegion(primaryHri, rootDir, CONF, htd, walPrimary);
primaryRegion.close(); primaryRegion.close();

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager; import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorConfig;
import org.apache.hadoop.hbase.executor.ExecutorType; import org.apache.hadoop.hbase.executor.ExecutorType;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests;
@ -211,7 +212,8 @@ public class TestSplitLogWorker {
SplitLogCounters.resetCounters(); SplitLogCounters.resetCounters();
executorService = new ExecutorService("TestSplitLogWorker"); executorService = new ExecutorService("TestSplitLogWorker");
executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, 10); executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS,
new ExecutorConfig().setCorePoolSize(10));
} }
@After @After