From 7a90c1a023b58357a177ac2316a419e17ffe7680 Mon Sep 17 00:00:00 2001 From: linkaline Date: Thu, 29 Aug 2019 00:48:27 +0800 Subject: [PATCH] HBASE-22881 Fix non-daemon threads in hbase server implementation (#512) (#536) Signed-off-by: stack --- .../hbase/client/TestClientNoCluster.java | 3 +- .../org/apache/hadoop/hbase/util/Threads.java | 2 +- .../hadoop/hbase/backup/HFileArchiver.java | 4 ++- .../master/MasterMobCompactionThread.java | 21 +++++------- .../assignment/SplitTableRegionProcedure.java | 2 +- ...egionServerFlushTableProcedureManager.java | 10 +++--- .../hbase/regionserver/CompactSplit.java | 34 ++++++------------- .../snapshot/RegionServerSnapshotManager.java | 8 ++--- .../hadoop/hbase/regionserver/wal/FSHLog.java | 3 +- .../HBaseInterClusterReplicationEndpoint.java | 8 ++--- .../regionserver/HFileReplicator.java | 21 +++++------- .../ReplicationSourceManager.java | 4 +-- .../hbase/snapshot/SnapshotManifest.java | 2 +- .../hbase/tool/HFileContentValidator.java | 2 +- .../org/apache/hadoop/hbase/util/FSUtils.java | 3 +- .../hadoop/hbase/util/ModifyRegionUtils.java | 10 +----- .../procedure/SimpleRSProcedureManager.java | 6 ++-- .../hbase/thrift/IncrementCoalescer.java | 30 ++-------------- 18 files changed, 62 insertions(+), 111 deletions(-) diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java index 3cab09dec58..c5858bd30df 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java @@ -807,7 +807,8 @@ public class TestClientNoCluster extends Configured implements Tool { // Have them all share the same connection so they all share the same instance of // ManyServersManyRegionsConnection so I can keep an eye on how many requests by server. - final ExecutorService pool = Executors.newCachedThreadPool(Threads.getNamedThreadFactory("p")); + final ExecutorService pool = Executors.newCachedThreadPool( + Threads.newDaemonThreadFactory("p")); // Executors.newFixedThreadPool(servers * 10, Threads.getNamedThreadFactory("p")); // Share a connection so I can keep counts in the 'server' on concurrency. final Connection sharedConnection = ConnectionFactory.createConnection(getConf()/*, pool*/); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java index 352734064ab..1ca6c2e583e 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java @@ -204,7 +204,7 @@ public class Threads { * @param prefix The prefix of every created Thread's name * @return a {@link java.util.concurrent.ThreadFactory} that names threads */ - public static ThreadFactory getNamedThreadFactory(final String prefix) { + private static ThreadFactory getNamedThreadFactory(final String prefix) { SecurityManager s = System.getSecurityManager(); final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() : Thread.currentThread() .getThreadGroup(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java index 53d2dbb0653..d336b4d1270 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java @@ -226,7 +226,9 @@ public class HFileArchiver { @Override public Thread newThread(Runnable r) { final String name = "HFileArchiver-" + threadNumber.getAndIncrement(); - return new Thread(r, name); + Thread t = new Thread(r, name); + t.setDaemon(true); + return t; } }; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMobCompactionThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMobCompactionThread.java index 9d6da0c1ffb..0779eeafe8b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMobCompactionThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMobCompactionThread.java @@ -23,21 +23,21 @@ import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.TableName; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.master.locking.LockManager; import org.apache.hadoop.hbase.mob.MobUtils; import org.apache.hadoop.hbase.procedure2.LockType; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; /** * The mob compaction thread used in {@link MasterRpcServices} @@ -55,14 +55,11 @@ public class MasterMobCompactionThread { this.conf = master.getConfiguration(); final String n = Thread.currentThread().getName(); // this pool is used to run the mob compaction - this.masterMobPool = new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS, - new SynchronousQueue<>(), new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - String name = n + "-MasterMobCompaction-" + EnvironmentEdgeManager.currentTime(); - return new Thread(r, name); - } - }); + this.masterMobPool = new ThreadPoolExecutor(1, 2, 60, + TimeUnit.SECONDS, new SynchronousQueue<>(), + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat(n + "-MasterMobCompaction-" + EnvironmentEdgeManager.currentTime()) + .build()); ((ThreadPoolExecutor) this.masterMobPool).allowCoreThreadTimeOut(true); // this pool is used in the mob compaction to compact the mob files by partitions // in parallel diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java index a6de3c5c3df..d713192b50e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java @@ -665,7 +665,7 @@ public class SplitTableRegionProcedure LOG.info("pid=" + getProcId() + " splitting " + nbFiles + " storefiles, region=" + getParentRegion().getShortNameToLog() + ", threads=" + maxThreads); final ExecutorService threadPool = Executors.newFixedThreadPool(maxThreads, - Threads.getNamedThreadFactory("StoreFileSplitter-%1$d")); + Threads.newDaemonThreadFactory("StoreFileSplitter-%1$d")); final List>> futures = new ArrayList>>(nbFiles); // Split each store file. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java index bf55c0c885c..ddd667fc6ab 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java @@ -25,11 +25,9 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.DaemonThreadFactory; @@ -47,7 +45,9 @@ import org.apache.hadoop.hbase.procedure.ZKProcedureMemberRpcs; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -213,10 +213,8 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur RegionServerFlushTableProcedureManager.FLUSH_TIMEOUT_MILLIS_DEFAULT); int threads = conf.getInt(CONCURENT_FLUSH_TASKS_KEY, DEFAULT_CONCURRENT_FLUSH_TASKS); this.name = name; - executor = new ThreadPoolExecutor(threads, threads, keepAlive, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<>(), new DaemonThreadFactory("rs(" - + name + ")-flush-proc-pool")); - executor.allowCoreThreadTimeOut(true); + executor = Threads.getBoundedCachedThreadPool(threads, keepAlive, TimeUnit.MILLISECONDS, + new DaemonThreadFactory("rs(" + name + ")-flush-proc-pool-")); taskPool = new ExecutorCompletionService<>(executor); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java index 9276af792df..bf5807367c7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java @@ -31,7 +31,6 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -56,8 +55,10 @@ import org.apache.hadoop.util.StringUtils; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; /** * Compact region on request and then run split if appropriate @@ -118,14 +119,9 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati private void createSplitExcecutors() { final String n = Thread.currentThread().getName(); int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT); - this.splits = - (ThreadPoolExecutor) Executors.newFixedThreadPool(splitThreads, new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - String name = n + "-splits-" + System.currentTimeMillis(); - return new Thread(r, name); - } - }); + this.splits = (ThreadPoolExecutor) Executors.newFixedThreadPool(splitThreads, + new ThreadFactoryBuilder().setNameFormat(n + "-splits-" + System.currentTimeMillis()) + .setDaemon(true).build()); } private void createCompactionExecutors() { @@ -144,24 +140,16 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati StealJobQueue stealJobQueue = new StealJobQueue(COMPARATOR); this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 60, TimeUnit.SECONDS, stealJobQueue, - new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - String name = n + "-longCompactions-" + System.currentTimeMillis(); - return new Thread(r, name); - } - }); + new ThreadFactoryBuilder() + .setNameFormat(n + "-longCompactions-" + System.currentTimeMillis()) + .setDaemon(true).build()); this.longCompactions.setRejectedExecutionHandler(new Rejection()); this.longCompactions.prestartAllCoreThreads(); this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 60, TimeUnit.SECONDS, stealJobQueue.getStealFromQueue(), - new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - String name = n + "-shortCompactions-" + System.currentTimeMillis(); - return new Thread(r, name); - } - }); + new ThreadFactoryBuilder() + .setNameFormat(n + "-shortCompactions-" + System.currentTimeMillis()) + .setDaemon(true).build()); this.shortCompactions.setRejectedExecutionHandler(new Rejection()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java index 08335ab4f11..579bb24bc35 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java @@ -26,7 +26,6 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -36,6 +35,7 @@ import org.apache.hadoop.hbase.DaemonThreadFactory; import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; @@ -283,10 +283,8 @@ public class RegionServerSnapshotManager extends RegionServerProcedureManager { RegionServerSnapshotManager.SNAPSHOT_TIMEOUT_MILLIS_DEFAULT); int threads = conf.getInt(CONCURENT_SNAPSHOT_TASKS_KEY, DEFAULT_CONCURRENT_SNAPSHOT_TASKS); this.name = name; - executor = new ThreadPoolExecutor(threads, threads, keepAlive, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<>(), new DaemonThreadFactory("rs(" - + name + ")-snapshot-pool")); - executor.allowCoreThreadTimeOut(true); + executor = Threads.getBoundedCachedThreadPool(threads, keepAlive, TimeUnit.MILLISECONDS, + new DaemonThreadFactory("rs(" + name + ")-snapshot-pool-")); taskPool = new ExecutorCompletionService<>(executor); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index f10b1ce0042..ed61784a306 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -222,7 +222,8 @@ public class FSHLog extends AbstractFSWAL { // Using BlockingWaitStrategy. Stuff that is going on here takes so long it makes no sense // spinning as other strategies do. this.disruptor = new Disruptor<>(RingBufferTruck::new, - getPreallocatedEventCount(), Threads.getNamedThreadFactory(hostingThreadName + ".append"), + getPreallocatedEventCount(), + Threads.newDaemonThreadFactory(hostingThreadName + ".append"), ProducerType.MULTI, new BlockingWaitStrategy()); // Advance the ring buffer sequence so that it starts from 1 instead of 0, // because SyncFuture.NOT_DONE = 0. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index 0014d9d0921..93a850e9303 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -32,7 +32,6 @@ import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; @@ -57,6 +56,7 @@ import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.ipc.RemoteException; import org.apache.yetus.audience.InterfaceAudience; @@ -64,6 +64,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface; @@ -141,9 +142,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi // per sink thread pool this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT); - this.exec = new ThreadPoolExecutor(maxThreads, maxThreads, 60, TimeUnit.SECONDS, - new LinkedBlockingQueue<>()); - this.exec.allowCoreThreadTimeOut(true); + this.exec = Threads.getBoundedCachedThreadPool(maxThreads, 60, TimeUnit.SECONDS, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("SinkThread-%d").build()); this.abortable = ctx.getAbortable(); // Set the size limit for replication RPCs to 95% of the max request size. // We could do with less slop if we have an accurate estimate of encoded size. Being diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java index 1f44817fe5f..ff54f41826d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java @@ -10,8 +10,6 @@ */ package org.apache.hadoop.hbase.replication.regionserver; -import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; - import java.io.FileNotFoundException; import java.io.IOException; import java.io.InterruptedIOException; @@ -27,7 +25,6 @@ import java.util.Map.Entry; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -39,9 +36,6 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Table; @@ -53,6 +47,12 @@ import org.apache.hadoop.hbase.security.token.FsDelegationToken; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; /** * It is used for replicating HFile entries. It will first copy parallely all the hfiles to a local @@ -105,12 +105,9 @@ public class HFileReplicator { this.maxCopyThreads = this.conf.getInt(REPLICATION_BULKLOAD_COPY_MAXTHREADS_KEY, REPLICATION_BULKLOAD_COPY_MAXTHREADS_DEFAULT); - ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); - builder.setNameFormat("HFileReplicationCallable-%1$d"); - this.exec = - new ThreadPoolExecutor(maxCopyThreads, maxCopyThreads, 60, TimeUnit.SECONDS, - new LinkedBlockingQueue<>(), builder.build()); - this.exec.allowCoreThreadTimeOut(true); + this.exec = Threads.getBoundedCachedThreadPool(maxCopyThreads, 60, TimeUnit.SECONDS, + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("HFileReplicationCallable-%1$d").build()); this.copiesPerThread = conf.getInt(REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_KEY, REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_DEFAULT); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 5d4f0349add..585245eb025 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -196,8 +196,8 @@ public class ReplicationSourceManager implements ReplicationListener { int nbWorkers = conf.getInt("replication.executor.workers", 1); // use a short 100ms sleep since this could be done inline with a RS startup // even if we fail, other region servers can take care of it - this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers, 100, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<>()); + this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers, 100, + TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); tfb.setNameFormat("ReplicationExecutor-%d"); tfb.setDaemon(true); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java index adb04c735bc..e631d183146 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java @@ -571,7 +571,7 @@ public final class SnapshotManifest { public static ThreadPoolExecutor createExecutor(final Configuration conf, final String name) { int maxThreads = conf.getInt("hbase.snapshot.thread.pool.max", 8); return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS, - Threads.getNamedThreadFactory(name)); + Threads.newDaemonThreadFactory(name)); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/HFileContentValidator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/HFileContentValidator.java index d60844bf953..dd711483517 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/HFileContentValidator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/HFileContentValidator.java @@ -108,7 +108,7 @@ public class HFileContentValidator extends AbstractHBaseTool { int availableProcessors = Runtime.getRuntime().availableProcessors(); int numThreads = conf.getInt("hfilevalidator.numthreads", availableProcessors); return Executors.newFixedThreadPool(numThreads, - Threads.getNamedThreadFactory("hfile-validator")); + Threads.newDaemonThreadFactory("hfile-validator")); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java index 2ebef45bbaa..d719796c481 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java @@ -1623,7 +1623,8 @@ public abstract class FSUtils extends CommonFSUtils { // run in multiple threads ThreadPoolExecutor tpe = new ThreadPoolExecutor(threadPoolSize, threadPoolSize, 60, TimeUnit.SECONDS, - new ArrayBlockingQueue<>(statusList.length)); + new ArrayBlockingQueue<>(statusList.length), + Threads.newDaemonThreadFactory("FSRegionQuery")); try { // ignore all file status items that are not of interest for (FileStatus regionStatus : statusList) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java index 1c860b42e98..79544fbfa92 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java @@ -28,7 +28,6 @@ import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -235,14 +234,7 @@ public abstract class ModifyRegionUtils { "hbase.hregion.open.and.init.threads.max", 16)); ThreadPoolExecutor regionOpenAndInitThreadPool = Threads .getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS, - new ThreadFactory() { - private int count = 1; - - @Override - public Thread newThread(Runnable r) { - return new Thread(r, threadNamePrefix + "-" + count++); - } - }); + Threads.newDaemonThreadFactory(threadNamePrefix)); return regionOpenAndInitThreadPool; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/SimpleRSProcedureManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/SimpleRSProcedureManager.java index f5a858ab93a..c99fcc19a2f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/SimpleRSProcedureManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/SimpleRSProcedureManager.java @@ -124,9 +124,9 @@ public class SimpleRSProcedureManager extends RegionServerProcedureManager { public SimpleSubprocedurePool(String name, Configuration conf) { this.name = name; - executor = new ThreadPoolExecutor(1, 1, 500, TimeUnit.SECONDS, - new LinkedBlockingQueue<>(), - new DaemonThreadFactory("rs(" + name + ")-procedure-pool")); + executor = new ThreadPoolExecutor(1, 1, 500, + TimeUnit.SECONDS, new LinkedBlockingQueue<>(), + new DaemonThreadFactory("rs(" + name + ")-procedure-pool-")); taskPool = new ExecutorCompletionService<>(executor); } diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java index bc4d9d5ad6e..122675c67f1 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java @@ -26,10 +26,8 @@ import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.LongAdder; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.client.Table; @@ -130,27 +128,6 @@ public class IncrementCoalescer implements IncrementCoalescerMBean { } - static class DaemonThreadFactory implements ThreadFactory { - static final AtomicInteger poolNumber = new AtomicInteger(1); - final ThreadGroup group; - final AtomicInteger threadNumber = new AtomicInteger(1); - final String namePrefix; - - DaemonThreadFactory() { - SecurityManager s = System.getSecurityManager(); - group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); - namePrefix = "ICV-" + poolNumber.getAndIncrement() + "-thread-"; - } - - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); - if (!t.isDaemon()) t.setDaemon(true); - if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); - return t; - } - } - private final LongAdder failedIncrements = new LongAdder(); private final LongAdder successfulCoalescings = new LongAdder(); private final LongAdder totalIncrements = new LongAdder(); @@ -168,10 +145,9 @@ public class IncrementCoalescer implements IncrementCoalescerMBean { public IncrementCoalescer(ThriftHBaseServiceHandler hand) { this.handler = hand; LinkedBlockingQueue queue = new LinkedBlockingQueue<>(); - pool = - new ThreadPoolExecutor(CORE_POOL_SIZE, CORE_POOL_SIZE, 50, TimeUnit.MILLISECONDS, queue, - Threads.newDaemonThreadFactory("IncrementCoalescer")); - + pool = new ThreadPoolExecutor(CORE_POOL_SIZE, CORE_POOL_SIZE, 50, + TimeUnit.MILLISECONDS, queue, + Threads.newDaemonThreadFactory("IncrementCoalescer")); MBeans.register("thrift", "Thrift", this); }