Signed-off-by: stack <stack@apache.org>
This commit is contained in:
parent
9b9f055ef7
commit
84b4a61e88
@ -807,7 +807,8 @@ public class TestClientNoCluster extends Configured implements Tool {
|
|||||||
|
|
||||||
// Have them all share the same connection so they all share the same instance of
|
// Have them all share the same connection so they all share the same instance of
|
||||||
// ManyServersManyRegionsConnection so I can keep an eye on how many requests by server.
|
// ManyServersManyRegionsConnection so I can keep an eye on how many requests by server.
|
||||||
final ExecutorService pool = Executors.newCachedThreadPool(Threads.getNamedThreadFactory("p"));
|
final ExecutorService pool = Executors.newCachedThreadPool(
|
||||||
|
Threads.newDaemonThreadFactory("p"));
|
||||||
// Executors.newFixedThreadPool(servers * 10, Threads.getNamedThreadFactory("p"));
|
// Executors.newFixedThreadPool(servers * 10, Threads.getNamedThreadFactory("p"));
|
||||||
// Share a connection so I can keep counts in the 'server' on concurrency.
|
// Share a connection so I can keep counts in the 'server' on concurrency.
|
||||||
final Connection sharedConnection = ConnectionFactory.createConnection(getConf()/*, pool*/);
|
final Connection sharedConnection = ConnectionFactory.createConnection(getConf()/*, pool*/);
|
||||||
|
@ -204,7 +204,7 @@ public class Threads {
|
|||||||
* @param prefix The prefix of every created Thread's name
|
* @param prefix The prefix of every created Thread's name
|
||||||
* @return a {@link java.util.concurrent.ThreadFactory} that names threads
|
* @return a {@link java.util.concurrent.ThreadFactory} that names threads
|
||||||
*/
|
*/
|
||||||
public static ThreadFactory getNamedThreadFactory(final String prefix) {
|
private static ThreadFactory getNamedThreadFactory(final String prefix) {
|
||||||
SecurityManager s = System.getSecurityManager();
|
SecurityManager s = System.getSecurityManager();
|
||||||
final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() : Thread.currentThread()
|
final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() : Thread.currentThread()
|
||||||
.getThreadGroup();
|
.getThreadGroup();
|
||||||
|
@ -226,7 +226,9 @@ public class HFileArchiver {
|
|||||||
@Override
|
@Override
|
||||||
public Thread newThread(Runnable r) {
|
public Thread newThread(Runnable r) {
|
||||||
final String name = "HFileArchiver-" + threadNumber.getAndIncrement();
|
final String name = "HFileArchiver-" + threadNumber.getAndIncrement();
|
||||||
return new Thread(r, name);
|
Thread t = new Thread(r, name);
|
||||||
|
t.setDaemon(true);
|
||||||
|
return t;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
@ -23,21 +23,21 @@ import java.util.List;
|
|||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.RejectedExecutionException;
|
import java.util.concurrent.RejectedExecutionException;
|
||||||
import java.util.concurrent.SynchronousQueue;
|
import java.util.concurrent.SynchronousQueue;
|
||||||
import java.util.concurrent.ThreadFactory;
|
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||||
import org.apache.hadoop.hbase.master.locking.LockManager;
|
import org.apache.hadoop.hbase.master.locking.LockManager;
|
||||||
import org.apache.hadoop.hbase.mob.MobUtils;
|
import org.apache.hadoop.hbase.mob.MobUtils;
|
||||||
import org.apache.hadoop.hbase.procedure2.LockType;
|
import org.apache.hadoop.hbase.procedure2.LockType;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The mob compaction thread used in {@link MasterRpcServices}
|
* The mob compaction thread used in {@link MasterRpcServices}
|
||||||
@ -55,14 +55,11 @@ public class MasterMobCompactionThread {
|
|||||||
this.conf = master.getConfiguration();
|
this.conf = master.getConfiguration();
|
||||||
final String n = Thread.currentThread().getName();
|
final String n = Thread.currentThread().getName();
|
||||||
// this pool is used to run the mob compaction
|
// this pool is used to run the mob compaction
|
||||||
this.masterMobPool = new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS,
|
this.masterMobPool = new ThreadPoolExecutor(1, 2, 60,
|
||||||
new SynchronousQueue<>(), new ThreadFactory() {
|
TimeUnit.SECONDS, new SynchronousQueue<>(),
|
||||||
@Override
|
new ThreadFactoryBuilder().setDaemon(true)
|
||||||
public Thread newThread(Runnable r) {
|
.setNameFormat(n + "-MasterMobCompaction-" + EnvironmentEdgeManager.currentTime())
|
||||||
String name = n + "-MasterMobCompaction-" + EnvironmentEdgeManager.currentTime();
|
.build());
|
||||||
return new Thread(r, name);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
((ThreadPoolExecutor) this.masterMobPool).allowCoreThreadTimeOut(true);
|
((ThreadPoolExecutor) this.masterMobPool).allowCoreThreadTimeOut(true);
|
||||||
// this pool is used in the mob compaction to compact the mob files by partitions
|
// this pool is used in the mob compaction to compact the mob files by partitions
|
||||||
// in parallel
|
// in parallel
|
||||||
|
@ -675,7 +675,7 @@ public class SplitTableRegionProcedure
|
|||||||
LOG.info("pid=" + getProcId() + " splitting " + nbFiles + " storefiles, region=" +
|
LOG.info("pid=" + getProcId() + " splitting " + nbFiles + " storefiles, region=" +
|
||||||
getParentRegion().getShortNameToLog() + ", threads=" + maxThreads);
|
getParentRegion().getShortNameToLog() + ", threads=" + maxThreads);
|
||||||
final ExecutorService threadPool = Executors.newFixedThreadPool(maxThreads,
|
final ExecutorService threadPool = Executors.newFixedThreadPool(maxThreads,
|
||||||
Threads.getNamedThreadFactory("StoreFileSplitter-%1$d"));
|
Threads.newDaemonThreadFactory("StoreFileSplitter-%1$d"));
|
||||||
final List<Future<Pair<Path, Path>>> futures = new ArrayList<Future<Pair<Path, Path>>>(nbFiles);
|
final List<Future<Pair<Path, Path>>> futures = new ArrayList<Future<Pair<Path, Path>>>(nbFiles);
|
||||||
|
|
||||||
// Split each store file.
|
// Split each store file.
|
||||||
|
@ -25,11 +25,9 @@ import java.util.concurrent.Callable;
|
|||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ExecutorCompletionService;
|
import java.util.concurrent.ExecutorCompletionService;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.Abortable;
|
import org.apache.hadoop.hbase.Abortable;
|
||||||
import org.apache.hadoop.hbase.DaemonThreadFactory;
|
import org.apache.hadoop.hbase.DaemonThreadFactory;
|
||||||
@ -47,7 +45,9 @@ import org.apache.hadoop.hbase.procedure.ZKProcedureMemberRpcs;
|
|||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
@ -213,10 +213,8 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur
|
|||||||
RegionServerFlushTableProcedureManager.FLUSH_TIMEOUT_MILLIS_DEFAULT);
|
RegionServerFlushTableProcedureManager.FLUSH_TIMEOUT_MILLIS_DEFAULT);
|
||||||
int threads = conf.getInt(CONCURENT_FLUSH_TASKS_KEY, DEFAULT_CONCURRENT_FLUSH_TASKS);
|
int threads = conf.getInt(CONCURENT_FLUSH_TASKS_KEY, DEFAULT_CONCURRENT_FLUSH_TASKS);
|
||||||
this.name = name;
|
this.name = name;
|
||||||
executor = new ThreadPoolExecutor(threads, threads, keepAlive, TimeUnit.MILLISECONDS,
|
executor = Threads.getBoundedCachedThreadPool(threads, keepAlive, TimeUnit.MILLISECONDS,
|
||||||
new LinkedBlockingQueue<>(), new DaemonThreadFactory("rs("
|
new DaemonThreadFactory("rs(" + name + ")-flush-proc-pool-"));
|
||||||
+ name + ")-flush-proc-pool"));
|
|
||||||
executor.allowCoreThreadTimeOut(true);
|
|
||||||
taskPool = new ExecutorCompletionService<>(executor);
|
taskPool = new ExecutorCompletionService<>(executor);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -31,7 +31,6 @@ import java.util.concurrent.BlockingQueue;
|
|||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.RejectedExecutionException;
|
import java.util.concurrent.RejectedExecutionException;
|
||||||
import java.util.concurrent.RejectedExecutionHandler;
|
import java.util.concurrent.RejectedExecutionHandler;
|
||||||
import java.util.concurrent.ThreadFactory;
|
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
@ -56,8 +55,10 @@ import org.apache.hadoop.util.StringUtils;
|
|||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Compact region on request and then run split if appropriate
|
* Compact region on request and then run split if appropriate
|
||||||
@ -118,14 +119,9 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
|
|||||||
private void createSplitExcecutors() {
|
private void createSplitExcecutors() {
|
||||||
final String n = Thread.currentThread().getName();
|
final String n = Thread.currentThread().getName();
|
||||||
int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT);
|
int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT);
|
||||||
this.splits =
|
this.splits = (ThreadPoolExecutor) Executors.newFixedThreadPool(splitThreads,
|
||||||
(ThreadPoolExecutor) Executors.newFixedThreadPool(splitThreads, new ThreadFactory() {
|
new ThreadFactoryBuilder().setNameFormat(n + "-splits-" + System.currentTimeMillis())
|
||||||
@Override
|
.setDaemon(true).build());
|
||||||
public Thread newThread(Runnable r) {
|
|
||||||
String name = n + "-splits-" + System.currentTimeMillis();
|
|
||||||
return new Thread(r, name);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void createCompactionExecutors() {
|
private void createCompactionExecutors() {
|
||||||
@ -144,24 +140,16 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
|
|||||||
StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<Runnable>(COMPARATOR);
|
StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<Runnable>(COMPARATOR);
|
||||||
this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 60,
|
this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 60,
|
||||||
TimeUnit.SECONDS, stealJobQueue,
|
TimeUnit.SECONDS, stealJobQueue,
|
||||||
new ThreadFactory() {
|
new ThreadFactoryBuilder()
|
||||||
@Override
|
.setNameFormat(n + "-longCompactions-" + System.currentTimeMillis())
|
||||||
public Thread newThread(Runnable r) {
|
.setDaemon(true).build());
|
||||||
String name = n + "-longCompactions-" + System.currentTimeMillis();
|
|
||||||
return new Thread(r, name);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
this.longCompactions.setRejectedExecutionHandler(new Rejection());
|
this.longCompactions.setRejectedExecutionHandler(new Rejection());
|
||||||
this.longCompactions.prestartAllCoreThreads();
|
this.longCompactions.prestartAllCoreThreads();
|
||||||
this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 60,
|
this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 60,
|
||||||
TimeUnit.SECONDS, stealJobQueue.getStealFromQueue(),
|
TimeUnit.SECONDS, stealJobQueue.getStealFromQueue(),
|
||||||
new ThreadFactory() {
|
new ThreadFactoryBuilder()
|
||||||
@Override
|
.setNameFormat(n + "-shortCompactions-" + System.currentTimeMillis())
|
||||||
public Thread newThread(Runnable r) {
|
.setDaemon(true).build());
|
||||||
String name = n + "-shortCompactions-" + System.currentTimeMillis();
|
|
||||||
return new Thread(r, name);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
this.shortCompactions.setRejectedExecutionHandler(new Rejection());
|
this.shortCompactions.setRejectedExecutionHandler(new Rejection());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -26,7 +26,6 @@ import java.util.concurrent.Callable;
|
|||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ExecutorCompletionService;
|
import java.util.concurrent.ExecutorCompletionService;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
@ -36,6 +35,7 @@ import org.apache.hadoop.hbase.DaemonThreadFactory;
|
|||||||
import org.apache.hadoop.hbase.DroppedSnapshotException;
|
import org.apache.hadoop.hbase.DroppedSnapshotException;
|
||||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.apache.yetus.audience.InterfaceStability;
|
import org.apache.yetus.audience.InterfaceStability;
|
||||||
@ -283,10 +283,8 @@ public class RegionServerSnapshotManager extends RegionServerProcedureManager {
|
|||||||
RegionServerSnapshotManager.SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
|
RegionServerSnapshotManager.SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
|
||||||
int threads = conf.getInt(CONCURENT_SNAPSHOT_TASKS_KEY, DEFAULT_CONCURRENT_SNAPSHOT_TASKS);
|
int threads = conf.getInt(CONCURENT_SNAPSHOT_TASKS_KEY, DEFAULT_CONCURRENT_SNAPSHOT_TASKS);
|
||||||
this.name = name;
|
this.name = name;
|
||||||
executor = new ThreadPoolExecutor(threads, threads, keepAlive, TimeUnit.MILLISECONDS,
|
executor = Threads.getBoundedCachedThreadPool(threads, keepAlive, TimeUnit.MILLISECONDS,
|
||||||
new LinkedBlockingQueue<>(), new DaemonThreadFactory("rs("
|
new DaemonThreadFactory("rs(" + name + ")-snapshot-pool-"));
|
||||||
+ name + ")-snapshot-pool"));
|
|
||||||
executor.allowCoreThreadTimeOut(true);
|
|
||||||
taskPool = new ExecutorCompletionService<>(executor);
|
taskPool = new ExecutorCompletionService<>(executor);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -227,7 +227,8 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
|||||||
// Using BlockingWaitStrategy. Stuff that is going on here takes so long it makes no sense
|
// Using BlockingWaitStrategy. Stuff that is going on here takes so long it makes no sense
|
||||||
// spinning as other strategies do.
|
// spinning as other strategies do.
|
||||||
this.disruptor = new Disruptor<>(RingBufferTruck::new,
|
this.disruptor = new Disruptor<>(RingBufferTruck::new,
|
||||||
getPreallocatedEventCount(), Threads.getNamedThreadFactory(hostingThreadName + ".append"),
|
getPreallocatedEventCount(),
|
||||||
|
Threads.newDaemonThreadFactory(hostingThreadName + ".append"),
|
||||||
ProducerType.MULTI, new BlockingWaitStrategy());
|
ProducerType.MULTI, new BlockingWaitStrategy());
|
||||||
// Advance the ring buffer sequence so that it starts from 1 instead of 0,
|
// Advance the ring buffer sequence so that it starts from 1 instead of 0,
|
||||||
// because SyncFuture.NOT_DONE = 0.
|
// because SyncFuture.NOT_DONE = 0.
|
||||||
|
@ -32,7 +32,6 @@ import java.util.concurrent.CompletionService;
|
|||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ExecutorCompletionService;
|
import java.util.concurrent.ExecutorCompletionService;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.regex.Matcher;
|
import java.util.regex.Matcher;
|
||||||
@ -57,6 +56,7 @@ import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
|
|||||||
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
|
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
@ -64,6 +64,7 @@ import org.slf4j.Logger;
|
|||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
|
||||||
|
|
||||||
@ -141,9 +142,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
|||||||
// per sink thread pool
|
// per sink thread pool
|
||||||
this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY,
|
this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY,
|
||||||
HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);
|
HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);
|
||||||
this.exec = new ThreadPoolExecutor(maxThreads, maxThreads, 60, TimeUnit.SECONDS,
|
this.exec = Threads.getBoundedCachedThreadPool(maxThreads, 60, TimeUnit.SECONDS,
|
||||||
new LinkedBlockingQueue<>());
|
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("SinkThread-%d").build());
|
||||||
this.exec.allowCoreThreadTimeOut(true);
|
|
||||||
this.abortable = ctx.getAbortable();
|
this.abortable = ctx.getAbortable();
|
||||||
// Set the size limit for replication RPCs to 95% of the max request size.
|
// Set the size limit for replication RPCs to 95% of the max request size.
|
||||||
// We could do with less slop if we have an accurate estimate of encoded size. Being
|
// We could do with less slop if we have an accurate estimate of encoded size. Being
|
||||||
|
@ -10,8 +10,6 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.replication.regionserver;
|
package org.apache.hadoop.hbase.replication.regionserver;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|
||||||
|
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InterruptedIOException;
|
import java.io.InterruptedIOException;
|
||||||
@ -27,7 +25,6 @@ import java.util.Map.Entry;
|
|||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
@ -39,9 +36,6 @@ import org.apache.hadoop.fs.permission.FsPermission;
|
|||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
import org.apache.hadoop.hbase.client.Connection;
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
import org.apache.hadoop.hbase.client.RegionLocator;
|
import org.apache.hadoop.hbase.client.RegionLocator;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
@ -53,6 +47,12 @@ import org.apache.hadoop.hbase.security.token.FsDelegationToken;
|
|||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* It is used for replicating HFile entries. It will first copy parallely all the hfiles to a local
|
* It is used for replicating HFile entries. It will first copy parallely all the hfiles to a local
|
||||||
@ -105,12 +105,9 @@ public class HFileReplicator {
|
|||||||
this.maxCopyThreads =
|
this.maxCopyThreads =
|
||||||
this.conf.getInt(REPLICATION_BULKLOAD_COPY_MAXTHREADS_KEY,
|
this.conf.getInt(REPLICATION_BULKLOAD_COPY_MAXTHREADS_KEY,
|
||||||
REPLICATION_BULKLOAD_COPY_MAXTHREADS_DEFAULT);
|
REPLICATION_BULKLOAD_COPY_MAXTHREADS_DEFAULT);
|
||||||
ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
|
this.exec = Threads.getBoundedCachedThreadPool(maxCopyThreads, 60, TimeUnit.SECONDS,
|
||||||
builder.setNameFormat("HFileReplicationCallable-%1$d");
|
new ThreadFactoryBuilder().setDaemon(true)
|
||||||
this.exec =
|
.setNameFormat("HFileReplicationCallable-%1$d").build());
|
||||||
new ThreadPoolExecutor(maxCopyThreads, maxCopyThreads, 60, TimeUnit.SECONDS,
|
|
||||||
new LinkedBlockingQueue<>(), builder.build());
|
|
||||||
this.exec.allowCoreThreadTimeOut(true);
|
|
||||||
this.copiesPerThread =
|
this.copiesPerThread =
|
||||||
conf.getInt(REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_KEY,
|
conf.getInt(REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_KEY,
|
||||||
REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_DEFAULT);
|
REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_DEFAULT);
|
||||||
|
@ -196,8 +196,8 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||||||
int nbWorkers = conf.getInt("replication.executor.workers", 1);
|
int nbWorkers = conf.getInt("replication.executor.workers", 1);
|
||||||
// use a short 100ms sleep since this could be done inline with a RS startup
|
// use a short 100ms sleep since this could be done inline with a RS startup
|
||||||
// even if we fail, other region servers can take care of it
|
// even if we fail, other region servers can take care of it
|
||||||
this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers, 100, TimeUnit.MILLISECONDS,
|
this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers, 100,
|
||||||
new LinkedBlockingQueue<>());
|
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
|
||||||
ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
|
ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
|
||||||
tfb.setNameFormat("ReplicationExecutor-%d");
|
tfb.setNameFormat("ReplicationExecutor-%d");
|
||||||
tfb.setDaemon(true);
|
tfb.setDaemon(true);
|
||||||
|
@ -571,7 +571,7 @@ public final class SnapshotManifest {
|
|||||||
public static ThreadPoolExecutor createExecutor(final Configuration conf, final String name) {
|
public static ThreadPoolExecutor createExecutor(final Configuration conf, final String name) {
|
||||||
int maxThreads = conf.getInt("hbase.snapshot.thread.pool.max", 8);
|
int maxThreads = conf.getInt("hbase.snapshot.thread.pool.max", 8);
|
||||||
return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
|
return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
|
||||||
Threads.getNamedThreadFactory(name));
|
Threads.newDaemonThreadFactory(name));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -108,7 +108,7 @@ public class HFileContentValidator extends AbstractHBaseTool {
|
|||||||
int availableProcessors = Runtime.getRuntime().availableProcessors();
|
int availableProcessors = Runtime.getRuntime().availableProcessors();
|
||||||
int numThreads = conf.getInt("hfilevalidator.numthreads", availableProcessors);
|
int numThreads = conf.getInt("hfilevalidator.numthreads", availableProcessors);
|
||||||
return Executors.newFixedThreadPool(numThreads,
|
return Executors.newFixedThreadPool(numThreads,
|
||||||
Threads.getNamedThreadFactory("hfile-validator"));
|
Threads.newDaemonThreadFactory("hfile-validator"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1623,7 +1623,8 @@ public abstract class FSUtils extends CommonFSUtils {
|
|||||||
// run in multiple threads
|
// run in multiple threads
|
||||||
ThreadPoolExecutor tpe = new ThreadPoolExecutor(threadPoolSize,
|
ThreadPoolExecutor tpe = new ThreadPoolExecutor(threadPoolSize,
|
||||||
threadPoolSize, 60, TimeUnit.SECONDS,
|
threadPoolSize, 60, TimeUnit.SECONDS,
|
||||||
new ArrayBlockingQueue<>(statusList.length));
|
new ArrayBlockingQueue<>(statusList.length),
|
||||||
|
Threads.newDaemonThreadFactory("FSRegionQuery"));
|
||||||
try {
|
try {
|
||||||
// ignore all file status items that are not of interest
|
// ignore all file status items that are not of interest
|
||||||
for (FileStatus regionStatus : statusList) {
|
for (FileStatus regionStatus : statusList) {
|
||||||
|
@ -28,7 +28,6 @@ import java.util.concurrent.Callable;
|
|||||||
import java.util.concurrent.CompletionService;
|
import java.util.concurrent.CompletionService;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ExecutorCompletionService;
|
import java.util.concurrent.ExecutorCompletionService;
|
||||||
import java.util.concurrent.ThreadFactory;
|
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
@ -235,14 +234,7 @@ public abstract class ModifyRegionUtils {
|
|||||||
"hbase.hregion.open.and.init.threads.max", 16));
|
"hbase.hregion.open.and.init.threads.max", 16));
|
||||||
ThreadPoolExecutor regionOpenAndInitThreadPool = Threads
|
ThreadPoolExecutor regionOpenAndInitThreadPool = Threads
|
||||||
.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
|
.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
|
||||||
new ThreadFactory() {
|
Threads.newDaemonThreadFactory(threadNamePrefix));
|
||||||
private int count = 1;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Thread newThread(Runnable r) {
|
|
||||||
return new Thread(r, threadNamePrefix + "-" + count++);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
return regionOpenAndInitThreadPool;
|
return regionOpenAndInitThreadPool;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -124,9 +124,9 @@ public class SimpleRSProcedureManager extends RegionServerProcedureManager {
|
|||||||
|
|
||||||
public SimpleSubprocedurePool(String name, Configuration conf) {
|
public SimpleSubprocedurePool(String name, Configuration conf) {
|
||||||
this.name = name;
|
this.name = name;
|
||||||
executor = new ThreadPoolExecutor(1, 1, 500, TimeUnit.SECONDS,
|
executor = new ThreadPoolExecutor(1, 1, 500,
|
||||||
new LinkedBlockingQueue<>(),
|
TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
|
||||||
new DaemonThreadFactory("rs(" + name + ")-procedure-pool"));
|
new DaemonThreadFactory("rs(" + name + ")-procedure-pool-"));
|
||||||
taskPool = new ExecutorCompletionService<>(executor);
|
taskPool = new ExecutorCompletionService<>(executor);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -26,10 +26,8 @@ import java.util.concurrent.Callable;
|
|||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.ThreadFactory;
|
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
import java.util.concurrent.atomic.LongAdder;
|
import java.util.concurrent.atomic.LongAdder;
|
||||||
import org.apache.hadoop.hbase.CellUtil;
|
import org.apache.hadoop.hbase.CellUtil;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
@ -130,27 +128,6 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static class DaemonThreadFactory implements ThreadFactory {
|
|
||||||
static final AtomicInteger poolNumber = new AtomicInteger(1);
|
|
||||||
final ThreadGroup group;
|
|
||||||
final AtomicInteger threadNumber = new AtomicInteger(1);
|
|
||||||
final String namePrefix;
|
|
||||||
|
|
||||||
DaemonThreadFactory() {
|
|
||||||
SecurityManager s = System.getSecurityManager();
|
|
||||||
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
|
|
||||||
namePrefix = "ICV-" + poolNumber.getAndIncrement() + "-thread-";
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Thread newThread(Runnable r) {
|
|
||||||
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
|
|
||||||
if (!t.isDaemon()) t.setDaemon(true);
|
|
||||||
if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY);
|
|
||||||
return t;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private final LongAdder failedIncrements = new LongAdder();
|
private final LongAdder failedIncrements = new LongAdder();
|
||||||
private final LongAdder successfulCoalescings = new LongAdder();
|
private final LongAdder successfulCoalescings = new LongAdder();
|
||||||
private final LongAdder totalIncrements = new LongAdder();
|
private final LongAdder totalIncrements = new LongAdder();
|
||||||
@ -168,10 +145,9 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
|
|||||||
public IncrementCoalescer(ThriftHBaseServiceHandler hand) {
|
public IncrementCoalescer(ThriftHBaseServiceHandler hand) {
|
||||||
this.handler = hand;
|
this.handler = hand;
|
||||||
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
|
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
|
||||||
pool =
|
pool = new ThreadPoolExecutor(CORE_POOL_SIZE, CORE_POOL_SIZE, 50,
|
||||||
new ThreadPoolExecutor(CORE_POOL_SIZE, CORE_POOL_SIZE, 50, TimeUnit.MILLISECONDS, queue,
|
TimeUnit.MILLISECONDS, queue,
|
||||||
Threads.newDaemonThreadFactory("IncrementCoalescer"));
|
Threads.newDaemonThreadFactory("IncrementCoalescer"));
|
||||||
|
|
||||||
MBeans.register("thrift", "Thrift", this);
|
MBeans.register("thrift", "Thrift", this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user