HBASE-27195 Clean up netty worker/thread pool configuration (#4619)
The configuration settings "hbase.netty.worker.count" and "hbase.netty.eventloop.rpcserver.thread.count" appear to duplicate each other. Also, formalizes another setting found in NettyEventLoopGroupConfig, "hbase.netty.nativetransport". Also, native epoll is not limited to amd64. aarch64 supports it too. Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
5af4b3d5c8
commit
745142adb1
|
@ -42,12 +42,15 @@ public class JVM {
|
||||||
|
|
||||||
private static final boolean ibmvendor =
|
private static final boolean ibmvendor =
|
||||||
System.getProperty("java.vendor") != null && System.getProperty("java.vendor").contains("IBM");
|
System.getProperty("java.vendor") != null && System.getProperty("java.vendor").contains("IBM");
|
||||||
private static final boolean windows =
|
// At least on my systems os.name reports as "linux", not "Linux". Prefer case insensitive tests.
|
||||||
System.getProperty("os.name") != null && System.getProperty("os.name").startsWith("Windows");
|
private static final boolean windows = System.getProperty("os.name") != null
|
||||||
private static final boolean linux =
|
&& System.getProperty("os.name").toLowerCase().contains("windows");
|
||||||
System.getProperty("os.name") != null && System.getProperty("os.name").startsWith("Linux");
|
private static final boolean linux = System.getProperty("os.name") != null
|
||||||
|
&& System.getProperty("os.name").toLowerCase().contains("linux");
|
||||||
private static final boolean amd64 =
|
private static final boolean amd64 =
|
||||||
System.getProperty("os.arch") != null && System.getProperty("os.arch").contains("amd64");
|
System.getProperty("os.arch") != null && System.getProperty("os.arch").contains("amd64");
|
||||||
|
private static final boolean aarch64 =
|
||||||
|
System.getProperty("os.arch") != null && System.getProperty("os.arch").contains("aarch64");
|
||||||
|
|
||||||
private static final String JVMVersion = System.getProperty("java.version");
|
private static final String JVMVersion = System.getProperty("java.version");
|
||||||
|
|
||||||
|
@ -99,6 +102,14 @@ public class JVM {
|
||||||
return amd64;
|
return amd64;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if the arch is aarch64;
|
||||||
|
* @return whether this is aarch64 or not.
|
||||||
|
*/
|
||||||
|
public static boolean isAarch64() {
|
||||||
|
return aarch64;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check if the finish() method of GZIPOutputStream is broken
|
* Check if the finish() method of GZIPOutputStream is broken
|
||||||
* @return whether GZIPOutputStream.finish() is broken.
|
* @return whether GZIPOutputStream.finish() is broken.
|
||||||
|
|
|
@ -26,7 +26,6 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.HBaseServerBase;
|
import org.apache.hadoop.hbase.HBaseServerBase;
|
||||||
import org.apache.hadoop.hbase.Server;
|
import org.apache.hadoop.hbase.Server;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
|
||||||
import org.apache.hadoop.hbase.security.HBasePolicyProvider;
|
import org.apache.hadoop.hbase.security.HBasePolicyProvider;
|
||||||
import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig;
|
import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig;
|
||||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||||
|
@ -47,10 +46,7 @@ import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
|
||||||
import org.apache.hbase.thirdparty.io.netty.channel.ServerChannel;
|
import org.apache.hbase.thirdparty.io.netty.channel.ServerChannel;
|
||||||
import org.apache.hbase.thirdparty.io.netty.channel.group.ChannelGroup;
|
import org.apache.hbase.thirdparty.io.netty.channel.group.ChannelGroup;
|
||||||
import org.apache.hbase.thirdparty.io.netty.channel.group.DefaultChannelGroup;
|
import org.apache.hbase.thirdparty.io.netty.channel.group.DefaultChannelGroup;
|
||||||
import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
|
|
||||||
import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioServerSocketChannel;
|
|
||||||
import org.apache.hbase.thirdparty.io.netty.handler.codec.FixedLengthFrameDecoder;
|
import org.apache.hbase.thirdparty.io.netty.handler.codec.FixedLengthFrameDecoder;
|
||||||
import org.apache.hbase.thirdparty.io.netty.util.concurrent.DefaultThreadFactory;
|
|
||||||
import org.apache.hbase.thirdparty.io.netty.util.concurrent.GlobalEventExecutor;
|
import org.apache.hbase.thirdparty.io.netty.util.concurrent.GlobalEventExecutor;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -61,14 +57,6 @@ import org.apache.hbase.thirdparty.io.netty.util.concurrent.GlobalEventExecutor;
|
||||||
public class NettyRpcServer extends RpcServer {
|
public class NettyRpcServer extends RpcServer {
|
||||||
public static final Logger LOG = LoggerFactory.getLogger(NettyRpcServer.class);
|
public static final Logger LOG = LoggerFactory.getLogger(NettyRpcServer.class);
|
||||||
|
|
||||||
/**
|
|
||||||
* Name of property to change netty rpc server eventloop thread count. Default is 0. Tests may set
|
|
||||||
* this down from unlimited.
|
|
||||||
*/
|
|
||||||
public static final String HBASE_NETTY_EVENTLOOP_RPCSERVER_THREADCOUNT_KEY =
|
|
||||||
"hbase.netty.eventloop.rpcserver.thread.count";
|
|
||||||
private static final int EVENTLOOP_THREADCOUNT_DEFAULT = 0;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Name of property to change the byte buf allocator for the netty channels. Default is no value,
|
* Name of property to change the byte buf allocator for the netty channels. Default is no value,
|
||||||
* which causes us to use PooledByteBufAllocator. Valid settings here are "pooled", "unpooled",
|
* which causes us to use PooledByteBufAllocator. Valid settings here are "pooled", "unpooled",
|
||||||
|
@ -97,21 +85,16 @@ public class NettyRpcServer extends RpcServer {
|
||||||
super(server, name, services, bindAddress, conf, scheduler, reservoirEnabled);
|
super(server, name, services, bindAddress, conf, scheduler, reservoirEnabled);
|
||||||
this.bindAddress = bindAddress;
|
this.bindAddress = bindAddress;
|
||||||
this.channelAllocator = getChannelAllocator(conf);
|
this.channelAllocator = getChannelAllocator(conf);
|
||||||
EventLoopGroup eventLoopGroup;
|
// Get the event loop group configuration from the server class if available.
|
||||||
Class<? extends ServerChannel> channelClass;
|
NettyEventLoopGroupConfig config = null;
|
||||||
if (server instanceof HRegionServer) {
|
if (server instanceof HBaseServerBase) {
|
||||||
NettyEventLoopGroupConfig config = ((HBaseServerBase) server).getEventLoopGroupConfig();
|
config = ((HBaseServerBase) server).getEventLoopGroupConfig();
|
||||||
eventLoopGroup = config.group();
|
|
||||||
channelClass = config.serverChannelClass();
|
|
||||||
} else {
|
|
||||||
int threadCount = server == null
|
|
||||||
? EVENTLOOP_THREADCOUNT_DEFAULT
|
|
||||||
: server.getConfiguration().getInt(HBASE_NETTY_EVENTLOOP_RPCSERVER_THREADCOUNT_KEY,
|
|
||||||
EVENTLOOP_THREADCOUNT_DEFAULT);
|
|
||||||
eventLoopGroup = new NioEventLoopGroup(threadCount,
|
|
||||||
new DefaultThreadFactory("NettyRpcServer", true, Thread.MAX_PRIORITY));
|
|
||||||
channelClass = NioServerSocketChannel.class;
|
|
||||||
}
|
}
|
||||||
|
if (config == null) {
|
||||||
|
config = new NettyEventLoopGroupConfig(conf, "NettyRpcServer");
|
||||||
|
}
|
||||||
|
EventLoopGroup eventLoopGroup = config.group();
|
||||||
|
Class<? extends ServerChannel> channelClass = config.serverChannelClass();
|
||||||
ServerBootstrap bootstrap = new ServerBootstrap().group(eventLoopGroup).channel(channelClass)
|
ServerBootstrap bootstrap = new ServerBootstrap().group(eventLoopGroup).channel(channelClass)
|
||||||
.childOption(ChannelOption.TCP_NODELAY, tcpNoDelay)
|
.childOption(ChannelOption.TCP_NODELAY, tcpNoDelay)
|
||||||
.childOption(ChannelOption.SO_KEEPALIVE, tcpKeepAlive)
|
.childOption(ChannelOption.SO_KEEPALIVE, tcpKeepAlive)
|
||||||
|
|
|
@ -39,6 +39,13 @@ import org.apache.hbase.thirdparty.io.netty.util.concurrent.DefaultThreadFactory
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class NettyEventLoopGroupConfig {
|
public class NettyEventLoopGroupConfig {
|
||||||
|
|
||||||
|
public static final String NETTY_WORKER_COUNT_KEY = "hbase.netty.worker.count";
|
||||||
|
public static final int DEFAULT_NETTY_WORKER_COUNT = 0;
|
||||||
|
|
||||||
|
public static final String NETTY_NATIVETRANSPORT_KEY = "hbase.netty.nativetransport";
|
||||||
|
public static final boolean DEFAULT_NETTY_NATIVETRANSPORT = true;
|
||||||
|
|
||||||
private final EventLoopGroup group;
|
private final EventLoopGroup group;
|
||||||
|
|
||||||
private final Class<? extends ServerChannel> serverChannelClass;
|
private final Class<? extends ServerChannel> serverChannelClass;
|
||||||
|
@ -47,14 +54,21 @@ public class NettyEventLoopGroupConfig {
|
||||||
|
|
||||||
private static boolean useEpoll(Configuration conf) {
|
private static boolean useEpoll(Configuration conf) {
|
||||||
// Config to enable native transport.
|
// Config to enable native transport.
|
||||||
boolean epollEnabled = conf.getBoolean("hbase.netty.nativetransport", true);
|
final boolean epollEnabled =
|
||||||
// Use the faster native epoll transport mechanism on linux if enabled
|
conf.getBoolean(NETTY_NATIVETRANSPORT_KEY, DEFAULT_NETTY_NATIVETRANSPORT);
|
||||||
return epollEnabled && JVM.isLinux() && JVM.isAmd64();
|
// Use the faster native epoll transport mechanism on linux if enabled and the
|
||||||
|
// hardware architecture is either amd64 or aarch64. Netty is known to have native
|
||||||
|
// epoll support for these combinations.
|
||||||
|
return epollEnabled && JVM.isLinux() && (JVM.isAmd64() || JVM.isAarch64());
|
||||||
}
|
}
|
||||||
|
|
||||||
public NettyEventLoopGroupConfig(Configuration conf, String threadPoolName) {
|
public NettyEventLoopGroupConfig(Configuration conf, String threadPoolName) {
|
||||||
boolean useEpoll = useEpoll(conf);
|
final boolean useEpoll = useEpoll(conf);
|
||||||
int workerCount = conf.getInt("hbase.netty.worker.count", 0);
|
final int workerCount = conf.getInt(NETTY_WORKER_COUNT_KEY,
|
||||||
|
// For backwards compatibility we also need to consider
|
||||||
|
// "hbase.netty.eventloop.rpcserver.thread.count"
|
||||||
|
// if it is defined in site configuration instead.
|
||||||
|
conf.getInt("hbase.netty.eventloop.rpcserver.thread.count", DEFAULT_NETTY_WORKER_COUNT));
|
||||||
ThreadFactory eventLoopThreadFactory =
|
ThreadFactory eventLoopThreadFactory =
|
||||||
new DefaultThreadFactory(threadPoolName, true, Thread.MAX_PRIORITY);
|
new DefaultThreadFactory(threadPoolName, true, Thread.MAX_PRIORITY);
|
||||||
if (useEpoll) {
|
if (useEpoll) {
|
||||||
|
|
|
@ -277,9 +277,4 @@
|
||||||
<value>3</value>
|
<value>3</value>
|
||||||
<description>Default is unbounded</description>
|
<description>Default is unbounded</description>
|
||||||
</property>
|
</property>
|
||||||
<property>
|
|
||||||
<name>hbase.netty.eventloop.rpcserver.thread.count</name>
|
|
||||||
<value>3</value>
|
|
||||||
<description>Default is unbounded</description>
|
|
||||||
</property>
|
|
||||||
</configuration>
|
</configuration>
|
||||||
|
|
Loading…
Reference in New Issue