HBASE-22883 Duplacate codes of method Threads.newDaemonThreadFactory() and class DaemonThreadFactory (#537)
Signed-off-by: stack <stack@apache.org>
This commit is contained in:
parent
5106f2826e
commit
3e2cfc1140
|
@ -31,8 +31,8 @@ import java.util.concurrent.TimeUnit;
|
|||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.DaemonThreadFactory;
|
||||
import org.apache.hadoop.hbase.errorhandling.ForeignException;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -64,8 +64,8 @@ public class LogRollBackupSubprocedurePool implements Closeable, Abortable {
|
|||
this.name = name;
|
||||
executor =
|
||||
new ThreadPoolExecutor(1, threads, keepAlive, TimeUnit.SECONDS,
|
||||
new LinkedBlockingQueue<>(), new DaemonThreadFactory("rs(" + name
|
||||
+ ")-backup-pool"));
|
||||
new LinkedBlockingQueue<>(),
|
||||
Threads.newDaemonThreadFactory("rs(" + name + ")-backup"));
|
||||
taskPool = new ExecutorCompletionService<>(executor);
|
||||
}
|
||||
|
||||
|
|
|
@ -197,6 +197,11 @@ public class Threads {
|
|||
return boundedCachedThreadPool;
|
||||
}
|
||||
|
||||
public static ThreadPoolExecutor getBoundedCachedThreadPool(int maxCachedThread, long timeout,
|
||||
TimeUnit unit, String prefix) {
|
||||
return getBoundedCachedThreadPool(maxCachedThread, timeout, unit,
|
||||
newDaemonThreadFactory(prefix));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a {@link java.util.concurrent.ThreadFactory} that names each created thread uniquely,
|
||||
|
|
|
@ -18,9 +18,9 @@
|
|||
|
||||
package org.apache.hadoop.hbase.chaos.policies;
|
||||
|
||||
import org.apache.hadoop.hbase.DaemonThreadFactory;
|
||||
import org.apache.hadoop.hbase.chaos.actions.Action;
|
||||
import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
@ -42,7 +42,7 @@ public class TwoConcurrentActionPolicy extends PeriodicPolicy {
|
|||
this.actionsOne = actionsOne;
|
||||
this.actionsTwo = actionsTwo;
|
||||
executor = Executors.newFixedThreadPool(2,
|
||||
new DaemonThreadFactory("TwoConcurrentAction-"));
|
||||
Threads.newDaemonThreadFactory("TwoConcurrentAction"));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -1,51 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Thread factory that creates daemon threads
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class DaemonThreadFactory implements ThreadFactory {
|
||||
private static final AtomicInteger poolNumber = new AtomicInteger(1);
|
||||
private final ThreadGroup group;
|
||||
private final AtomicInteger threadNumber = new AtomicInteger(1);
|
||||
private final String namePrefix;
|
||||
|
||||
public DaemonThreadFactory(String name) {
|
||||
SecurityManager s = System.getSecurityManager();
|
||||
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
|
||||
namePrefix = name + poolNumber.getAndIncrement() + "-thread-";
|
||||
}
|
||||
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
|
||||
if (!t.isDaemon()) {
|
||||
t.setDaemon(true);
|
||||
}
|
||||
if (t.getPriority() != Thread.NORM_PRIORITY) {
|
||||
t.setPriority(Thread.NORM_PRIORITY);
|
||||
}
|
||||
return t;
|
||||
}
|
||||
}
|
|
@ -25,7 +25,7 @@ import java.util.concurrent.ThreadPoolExecutor;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.DaemonThreadFactory;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -66,7 +66,7 @@ public class FifoRpcScheduler extends RpcScheduler {
|
|||
60,
|
||||
TimeUnit.SECONDS,
|
||||
new ArrayBlockingQueue<>(maxQueueLength),
|
||||
new DaemonThreadFactory("FifoRpcScheduler.handler"),
|
||||
Threads.newDaemonThreadFactory("FifoRpcScheduler.handler"),
|
||||
new ThreadPoolExecutor.CallerRunsPolicy());
|
||||
}
|
||||
|
||||
|
|
|
@ -25,7 +25,7 @@ import java.util.concurrent.TimeUnit;
|
|||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.DaemonThreadFactory;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceStability;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -72,11 +72,11 @@ public class MasterFifoRpcScheduler extends FifoRpcScheduler {
|
|||
rsRsreportMaxQueueLength);
|
||||
this.executor = new ThreadPoolExecutor(handlerCount, handlerCount, 60, TimeUnit.SECONDS,
|
||||
new ArrayBlockingQueue<Runnable>(maxQueueLength),
|
||||
new DaemonThreadFactory("MasterFifoRpcScheduler.call.handler"),
|
||||
Threads.newDaemonThreadFactory("MasterFifoRpcScheduler.call.handler"),
|
||||
new ThreadPoolExecutor.CallerRunsPolicy());
|
||||
this.rsReportExecutor = new ThreadPoolExecutor(rsReportHandlerCount, rsReportHandlerCount, 60,
|
||||
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(rsRsreportMaxQueueLength),
|
||||
new DaemonThreadFactory("MasterFifoRpcScheduler.RSReport.handler"),
|
||||
Threads.newDaemonThreadFactory("MasterFifoRpcScheduler.RSReport.handler"),
|
||||
new ThreadPoolExecutor.CallerRunsPolicy());
|
||||
}
|
||||
|
||||
|
|
|
@ -17,13 +17,12 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.master.cleaner;
|
||||
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.DaemonThreadFactory;
|
||||
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -51,10 +50,7 @@ public class DirScanPool implements ConfigurationObserver {
|
|||
}
|
||||
|
||||
private static ThreadPoolExecutor initializePool(int size) {
|
||||
ThreadPoolExecutor executor = new ThreadPoolExecutor(size, size, 1, TimeUnit.MINUTES,
|
||||
new LinkedBlockingQueue<>(), new DaemonThreadFactory("dir-scan-pool"));
|
||||
executor.allowCoreThreadTimeOut(true);
|
||||
return executor;
|
||||
return Threads.getBoundedCachedThreadPool(size, 1, TimeUnit.MINUTES, "dir-scan");
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -29,14 +29,14 @@ import java.util.concurrent.SynchronousQueue;
|
|||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.hbase.errorhandling.ForeignException;
|
||||
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.hbase.DaemonThreadFactory;
|
||||
import org.apache.hadoop.hbase.errorhandling.ForeignException;
|
||||
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.MapMaker;
|
||||
|
||||
/**
|
||||
|
@ -113,7 +113,7 @@ public class ProcedureCoordinator {
|
|||
long keepAliveMillis) {
|
||||
return new ThreadPoolExecutor(1, opThreads, keepAliveMillis, TimeUnit.MILLISECONDS,
|
||||
new SynchronousQueue<>(),
|
||||
new DaemonThreadFactory("(" + coordName + ")-proc-coordinator-pool"));
|
||||
Threads.newDaemonThreadFactory("(" + coordName + ")-proc-coordinator"));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -26,11 +26,11 @@ import java.util.concurrent.SynchronousQueue;
|
|||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hadoop.hbase.errorhandling.ForeignException;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.hbase.DaemonThreadFactory;
|
||||
import org.apache.hadoop.hbase.errorhandling.ForeignException;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.MapMaker;
|
||||
|
||||
|
@ -87,7 +87,7 @@ public class ProcedureMember implements Closeable {
|
|||
long keepAliveMillis) {
|
||||
return new ThreadPoolExecutor(1, procThreads, keepAliveMillis, TimeUnit.MILLISECONDS,
|
||||
new SynchronousQueue<>(),
|
||||
new DaemonThreadFactory("member: '" + memberName + "' subprocedure-pool"));
|
||||
Threads.newDaemonThreadFactory("member: '" + memberName + "' subprocedure"));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -30,7 +30,6 @@ import java.util.concurrent.TimeUnit;
|
|||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.DaemonThreadFactory;
|
||||
import org.apache.hadoop.hbase.DroppedSnapshotException;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
|
@ -215,7 +214,7 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur
|
|||
int threads = conf.getInt(CONCURENT_FLUSH_TASKS_KEY, DEFAULT_CONCURRENT_FLUSH_TASKS);
|
||||
this.name = name;
|
||||
executor = Threads.getBoundedCachedThreadPool(threads, keepAlive, TimeUnit.MILLISECONDS,
|
||||
new DaemonThreadFactory("rs(" + name + ")-flush-proc-pool-"));
|
||||
"rs(" + name + ")-flush-proc");
|
||||
taskPool = new ExecutorCompletionService<>(executor);
|
||||
}
|
||||
|
||||
|
|
|
@ -31,7 +31,6 @@ import java.util.concurrent.TimeUnit;
|
|||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.DaemonThreadFactory;
|
||||
import org.apache.hadoop.hbase.DroppedSnapshotException;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
|
@ -284,7 +283,7 @@ public class RegionServerSnapshotManager extends RegionServerProcedureManager {
|
|||
int threads = conf.getInt(CONCURENT_SNAPSHOT_TASKS_KEY, DEFAULT_CONCURRENT_SNAPSHOT_TASKS);
|
||||
this.name = name;
|
||||
executor = Threads.getBoundedCachedThreadPool(threads, keepAlive, TimeUnit.MILLISECONDS,
|
||||
new DaemonThreadFactory("rs(" + name + ")-snapshot-pool-"));
|
||||
"rs(" + name + ")-snapshot");
|
||||
taskPool = new ExecutorCompletionService<>(executor);
|
||||
}
|
||||
|
||||
|
|
|
@ -19,9 +19,9 @@
|
|||
package org.apache.hadoop.hbase.security.access;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.DaemonThreadFactory;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKListener;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
|
@ -69,7 +69,7 @@ public class ZKPermissionWatcher extends ZKListener implements Closeable {
|
|||
String aclZnodeParent = conf.get("zookeeper.znode.acl.parent", ACL_NODE);
|
||||
this.aclZNode = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, aclZnodeParent);
|
||||
executor = Executors.newSingleThreadExecutor(
|
||||
new DaemonThreadFactory("zk-permission-watcher"));
|
||||
Threads.newDaemonThreadFactory("zk-permission-watcher"));
|
||||
}
|
||||
|
||||
public void start() throws KeeperException {
|
||||
|
|
|
@ -31,9 +31,9 @@ import java.util.concurrent.TimeUnit;
|
|||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.DaemonThreadFactory;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.hadoop.hbase.errorhandling.ForeignException;
|
||||
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
|
||||
|
@ -127,7 +127,7 @@ public class SimpleRSProcedureManager extends RegionServerProcedureManager {
|
|||
this.name = name;
|
||||
executor = new ThreadPoolExecutor(1, 1, 500,
|
||||
TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
|
||||
new DaemonThreadFactory("rs(" + name + ")-procedure-pool-"));
|
||||
Threads.newDaemonThreadFactory("rs(" + name + ")-procedure"));
|
||||
taskPool = new ExecutorCompletionService<>(executor);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue