HBASE-27111 Make Netty channel bytebuf allocator configurable. (#4525)
Support site configuration of the bytebuf allocator that Netty will use for NettyRpcServer channels. Property name is 'hbase.netty.rpcserver.allocator'. Default is no value, which is equivalent to "pooled". Valid values are: - "pooled": use PooledByteBufAllocator - "unpooled": use UnpooledByteBufAllocator - "heap": use HeapByteBufAllocator, which is a PooledByteBufAllocator that preferentially allocates buffers on heap wherever possible - <class>: If the value is none of the recognized labels, treat it as a class name implementing org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator. This allows the user to add a custom implementation, perhaps for debugging. Also updates ReflectionUtils with a new helper method. Signed-off-by: Viraj Jasani <vjasani@apache.org>
This commit is contained in:
parent
b8558d30d0
commit
4f88a3c4bb
|
@ -63,6 +63,31 @@ public class ReflectionUtils {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public static <T> T newInstance(String className, Object... params) {
|
||||||
|
Class<T> type;
|
||||||
|
try {
|
||||||
|
type = (Class<T>) getClassLoader().loadClass(className);
|
||||||
|
} catch (ClassNotFoundException | ClassCastException e) {
|
||||||
|
throw new UnsupportedOperationException("Unable to load specified class " + className, e);
|
||||||
|
}
|
||||||
|
return instantiate(type.getName(), findConstructor(type, params), params);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static ClassLoader getClassLoader() {
|
||||||
|
ClassLoader cl = Thread.currentThread().getContextClassLoader();
|
||||||
|
if (cl == null) {
|
||||||
|
cl = ReflectionUtils.class.getClassLoader();
|
||||||
|
}
|
||||||
|
if (cl == null) {
|
||||||
|
cl = ClassLoader.getSystemClassLoader();
|
||||||
|
}
|
||||||
|
if (cl == null) {
|
||||||
|
throw new RuntimeException("A ClassLoader could not be found");
|
||||||
|
}
|
||||||
|
return cl;
|
||||||
|
}
|
||||||
|
|
||||||
public static <T> T newInstance(Class<T> type, Object... params) {
|
public static <T> T newInstance(Class<T> type, Object... params) {
|
||||||
return instantiate(type.getName(), findConstructor(type, params), params);
|
return instantiate(type.getName(), findConstructor(type, params), params);
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,56 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.ipc;
|
||||||
|
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.io.netty.buffer.AbstractByteBufAllocator;
|
||||||
|
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
|
||||||
|
import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A pooled ByteBufAllocator that does not prefer direct buffers regardless of platform settings.
|
||||||
|
* <p>
|
||||||
|
* In some cases direct buffers are still required, like IO buffers where the buffer will be used in
|
||||||
|
* conjunction with a native method call, so we cannot force all buffer usage on heap. But we can
|
||||||
|
* strongly prefer it.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class HeapByteBufAllocator extends AbstractByteBufAllocator {
|
||||||
|
|
||||||
|
public static final HeapByteBufAllocator DEFAULT = new HeapByteBufAllocator();
|
||||||
|
|
||||||
|
private final PooledByteBufAllocator delegate =
|
||||||
|
new PooledByteBufAllocator(false /* preferDirect */);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isDirectBufferPooled() {
|
||||||
|
return delegate.isDirectBufferPooled();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
|
||||||
|
return delegate.heapBuffer(initialCapacity, maxCapacity);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
|
||||||
|
return delegate.directBuffer(initialCapacity, maxCapacity);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -29,12 +29,16 @@ import org.apache.hadoop.hbase.Server;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||||
import org.apache.hadoop.hbase.security.HBasePolicyProvider;
|
import org.apache.hadoop.hbase.security.HBasePolicyProvider;
|
||||||
import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig;
|
import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig;
|
||||||
|
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||||
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
|
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.io.netty.bootstrap.ServerBootstrap;
|
import org.apache.hbase.thirdparty.io.netty.bootstrap.ServerBootstrap;
|
||||||
|
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator;
|
||||||
|
import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator;
|
||||||
|
import org.apache.hbase.thirdparty.io.netty.buffer.UnpooledByteBufAllocator;
|
||||||
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
|
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
|
||||||
import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer;
|
import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer;
|
||||||
import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
|
import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
|
||||||
|
@ -65,18 +69,35 @@ public class NettyRpcServer extends RpcServer {
|
||||||
"hbase.netty.eventloop.rpcserver.thread.count";
|
"hbase.netty.eventloop.rpcserver.thread.count";
|
||||||
private static final int EVENTLOOP_THREADCOUNT_DEFAULT = 0;
|
private static final int EVENTLOOP_THREADCOUNT_DEFAULT = 0;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Name of property to change the byte buf allocator for the netty channels. Default is no value,
|
||||||
|
* which causes us to use PooledByteBufAllocator. Valid settings here are "pooled", "unpooled",
|
||||||
|
* and "heap", or, the name of a class implementing ByteBufAllocator.
|
||||||
|
* <p>
|
||||||
|
* "pooled" and "unpooled" may prefer direct memory depending on netty configuration, which is
|
||||||
|
* controlled by platform specific code and documented system properties.
|
||||||
|
* <p>
|
||||||
|
* "heap" will prefer heap arena allocations.
|
||||||
|
*/
|
||||||
|
public static final String HBASE_NETTY_ALLOCATOR_KEY = "hbase.netty.rpcserver.allocator";
|
||||||
|
static final String POOLED_ALLOCATOR_TYPE = "pooled";
|
||||||
|
static final String UNPOOLED_ALLOCATOR_TYPE = "unpooled";
|
||||||
|
static final String HEAP_ALLOCATOR_TYPE = "heap";
|
||||||
|
|
||||||
private final InetSocketAddress bindAddress;
|
private final InetSocketAddress bindAddress;
|
||||||
|
|
||||||
private final CountDownLatch closed = new CountDownLatch(1);
|
private final CountDownLatch closed = new CountDownLatch(1);
|
||||||
private final Channel serverChannel;
|
private final Channel serverChannel;
|
||||||
private final ChannelGroup allChannels =
|
private final ChannelGroup allChannels =
|
||||||
new DefaultChannelGroup(GlobalEventExecutor.INSTANCE, true);
|
new DefaultChannelGroup(GlobalEventExecutor.INSTANCE, true);
|
||||||
|
private final ByteBufAllocator channelAllocator;
|
||||||
|
|
||||||
public NettyRpcServer(Server server, String name, List<BlockingServiceAndInterface> services,
|
public NettyRpcServer(Server server, String name, List<BlockingServiceAndInterface> services,
|
||||||
InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler,
|
InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler,
|
||||||
boolean reservoirEnabled) throws IOException {
|
boolean reservoirEnabled) throws IOException {
|
||||||
super(server, name, services, bindAddress, conf, scheduler, reservoirEnabled);
|
super(server, name, services, bindAddress, conf, scheduler, reservoirEnabled);
|
||||||
this.bindAddress = bindAddress;
|
this.bindAddress = bindAddress;
|
||||||
|
this.channelAllocator = getChannelAllocator(conf);
|
||||||
EventLoopGroup eventLoopGroup;
|
EventLoopGroup eventLoopGroup;
|
||||||
Class<? extends ServerChannel> channelClass;
|
Class<? extends ServerChannel> channelClass;
|
||||||
if (server instanceof HRegionServer) {
|
if (server instanceof HRegionServer) {
|
||||||
|
@ -97,9 +118,9 @@ public class NettyRpcServer extends RpcServer {
|
||||||
.childOption(ChannelOption.SO_KEEPALIVE, tcpKeepAlive)
|
.childOption(ChannelOption.SO_KEEPALIVE, tcpKeepAlive)
|
||||||
.childOption(ChannelOption.SO_REUSEADDR, true)
|
.childOption(ChannelOption.SO_REUSEADDR, true)
|
||||||
.childHandler(new ChannelInitializer<Channel>() {
|
.childHandler(new ChannelInitializer<Channel>() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void initChannel(Channel ch) throws Exception {
|
protected void initChannel(Channel ch) throws Exception {
|
||||||
|
ch.config().setAllocator(channelAllocator);
|
||||||
ChannelPipeline pipeline = ch.pipeline();
|
ChannelPipeline pipeline = ch.pipeline();
|
||||||
FixedLengthFrameDecoder preambleDecoder = new FixedLengthFrameDecoder(6);
|
FixedLengthFrameDecoder preambleDecoder = new FixedLengthFrameDecoder(6);
|
||||||
preambleDecoder.setSingleDecode(true);
|
preambleDecoder.setSingleDecode(true);
|
||||||
|
@ -120,6 +141,36 @@ public class NettyRpcServer extends RpcServer {
|
||||||
this.scheduler.init(new RpcSchedulerContext(this));
|
this.scheduler.init(new RpcSchedulerContext(this));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private ByteBufAllocator getChannelAllocator(Configuration conf) throws IOException {
|
||||||
|
final String value = conf.get(HBASE_NETTY_ALLOCATOR_KEY);
|
||||||
|
if (value != null) {
|
||||||
|
if (POOLED_ALLOCATOR_TYPE.equalsIgnoreCase(value)) {
|
||||||
|
LOG.info("Using {} for buffer allocation", PooledByteBufAllocator.class.getName());
|
||||||
|
return PooledByteBufAllocator.DEFAULT;
|
||||||
|
} else if (UNPOOLED_ALLOCATOR_TYPE.equalsIgnoreCase(value)) {
|
||||||
|
LOG.info("Using {} for buffer allocation", UnpooledByteBufAllocator.class.getName());
|
||||||
|
return UnpooledByteBufAllocator.DEFAULT;
|
||||||
|
} else if (HEAP_ALLOCATOR_TYPE.equalsIgnoreCase(value)) {
|
||||||
|
LOG.info("Using {} for buffer allocation", HeapByteBufAllocator.class.getName());
|
||||||
|
return HeapByteBufAllocator.DEFAULT;
|
||||||
|
} else {
|
||||||
|
// If the value is none of the recognized labels, treat it as a class name. This allows the
|
||||||
|
// user to supply a custom implementation, perhaps for debugging.
|
||||||
|
try {
|
||||||
|
// ReflectionUtils throws UnsupportedOperationException if there are any problems.
|
||||||
|
ByteBufAllocator alloc = (ByteBufAllocator) ReflectionUtils.newInstance(value);
|
||||||
|
LOG.info("Using {} for buffer allocation", value);
|
||||||
|
return alloc;
|
||||||
|
} catch (ClassCastException | UnsupportedOperationException e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
LOG.info("Using {} for buffer allocation", PooledByteBufAllocator.class.getName());
|
||||||
|
return PooledByteBufAllocator.DEFAULT;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
protected NettyRpcServerPreambleHandler createNettyRpcServerPreambleHandler() {
|
protected NettyRpcServerPreambleHandler createNettyRpcServerPreambleHandler() {
|
||||||
return new NettyRpcServerPreambleHandler(NettyRpcServer.this);
|
return new NettyRpcServerPreambleHandler(NettyRpcServer.this);
|
||||||
|
|
|
@ -0,0 +1,52 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.ipc;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.io.netty.buffer.AbstractByteBufAllocator;
|
||||||
|
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
|
||||||
|
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator;
|
||||||
|
import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A custom byte buf allocator for TestNettyRpcServer.
|
||||||
|
*/
|
||||||
|
public class SimpleByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocator {
|
||||||
|
|
||||||
|
static final Logger LOG = LoggerFactory.getLogger(SimpleByteBufAllocator.class);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isDirectBufferPooled() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
|
||||||
|
LOG.info("newHeapBuffer initialCapacity={}, maxCapacity={}", initialCapacity, maxCapacity);
|
||||||
|
return Unpooled.buffer(initialCapacity, maxCapacity);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
|
||||||
|
LOG.info("newDirectBuffer initialCapacity={}, maxCapacity={}", initialCapacity, maxCapacity);
|
||||||
|
return Unpooled.directBuffer(initialCapacity, maxCapacity);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -21,6 +21,8 @@ import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtil;
|
import org.apache.hadoop.hbase.HBaseTestingUtil;
|
||||||
|
@ -33,16 +35,19 @@ import org.apache.hadoop.hbase.client.Table;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.RPCTests;
|
import org.apache.hadoop.hbase.testclassification.RPCTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.junit.AfterClass;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.BeforeClass;
|
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
import org.junit.rules.TestName;
|
import org.junit.rules.TestName;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
import org.junit.runners.Parameterized.Parameters;
|
||||||
|
|
||||||
@Category({ RPCTests.class, MediumTests.class })
|
@Category({ RPCTests.class, MediumTests.class })
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
public class TestNettyRpcServer {
|
public class TestNettyRpcServer {
|
||||||
|
|
||||||
@ClassRule
|
@ClassRule
|
||||||
|
@ -57,22 +62,28 @@ public class TestNettyRpcServer {
|
||||||
private static byte[] FAMILY = Bytes.toBytes("f1");
|
private static byte[] FAMILY = Bytes.toBytes("f1");
|
||||||
private static byte[] PRIVATE_COL = Bytes.toBytes("private");
|
private static byte[] PRIVATE_COL = Bytes.toBytes("private");
|
||||||
private static byte[] PUBLIC_COL = Bytes.toBytes("public");
|
private static byte[] PUBLIC_COL = Bytes.toBytes("public");
|
||||||
|
@Parameterized.Parameter
|
||||||
|
public String allocatorType;
|
||||||
|
|
||||||
@Before
|
@Parameters
|
||||||
public void setup() {
|
public static Collection<Object[]> parameters() {
|
||||||
TABLE = TableName.valueOf(name.getMethodName());
|
return Arrays.asList(new Object[][] { { NettyRpcServer.POOLED_ALLOCATOR_TYPE },
|
||||||
|
{ NettyRpcServer.UNPOOLED_ALLOCATOR_TYPE }, { NettyRpcServer.HEAP_ALLOCATOR_TYPE },
|
||||||
|
{ SimpleByteBufAllocator.class.getName() } });
|
||||||
}
|
}
|
||||||
|
|
||||||
@BeforeClass
|
@Before
|
||||||
public static void setupBeforeClass() throws Exception {
|
public void setup() throws Exception {
|
||||||
|
TABLE = TableName.valueOf(name.getMethodName().replace('[', '_').replace(']', '_'));
|
||||||
TEST_UTIL = new HBaseTestingUtil();
|
TEST_UTIL = new HBaseTestingUtil();
|
||||||
TEST_UTIL.getConfiguration().set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY,
|
TEST_UTIL.getConfiguration().set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY,
|
||||||
NettyRpcServer.class.getName());
|
NettyRpcServer.class.getName());
|
||||||
|
TEST_UTIL.getConfiguration().set(NettyRpcServer.HBASE_NETTY_ALLOCATOR_KEY, allocatorType);
|
||||||
TEST_UTIL.startMiniCluster();
|
TEST_UTIL.startMiniCluster();
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@After
|
||||||
public static void tearDownAfterClass() throws Exception {
|
public void tearDown() throws Exception {
|
||||||
TEST_UTIL.shutdownMiniCluster();
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue