From 4f88a3c4bb6181192b1fdfc0bca02835585245ef Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Thu, 23 Jun 2022 15:13:29 -0700 Subject: [PATCH] HBASE-27111 Make Netty channel bytebuf allocator configurable. (#4525) Support site configuration of the bytebuf allocator that Netty will use for NettyRpcServer channels. Property name is 'hbase.netty.rpcserver.allocator'. Default is no value, which is equivalent to "pooled". Valid values are: - "pooled": use PooledByteBufAllocator - "unpooled": use UnpooledByteBufAllocator - "heap": use HeapByteBufAllocator, which is a PooledByteBufAllocator that preferentially allocates buffers on heap wherever possible - : If the value is none of the recognized labels, treat it as a class name implementing org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator. This allows the user to add a custom implementation, perhaps for debugging. Also updates ReflectionUtils with a new helper method. Signed-off-by: Viraj Jasani --- .../hadoop/hbase/util/ReflectionUtils.java | 25 +++++++++ .../hbase/ipc/HeapByteBufAllocator.java | 56 +++++++++++++++++++ .../hadoop/hbase/ipc/NettyRpcServer.java | 53 +++++++++++++++++- .../hbase/ipc/SimpleByteBufAllocator.java | 52 +++++++++++++++++ .../hadoop/hbase/ipc/TestNettyRpcServer.java | 29 +++++++--- 5 files changed, 205 insertions(+), 10 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HeapByteBufAllocator.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/SimpleByteBufAllocator.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java index 5f6500fb2a5..547d28cfa88 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java @@ -63,6 +63,31 @@ public class ReflectionUtils { } } + @SuppressWarnings("unchecked") + public static T newInstance(String className, Object... params) { + Class type; + try { + type = (Class) getClassLoader().loadClass(className); + } catch (ClassNotFoundException | ClassCastException e) { + throw new UnsupportedOperationException("Unable to load specified class " + className, e); + } + return instantiate(type.getName(), findConstructor(type, params), params); + } + + public static ClassLoader getClassLoader() { + ClassLoader cl = Thread.currentThread().getContextClassLoader(); + if (cl == null) { + cl = ReflectionUtils.class.getClassLoader(); + } + if (cl == null) { + cl = ClassLoader.getSystemClassLoader(); + } + if (cl == null) { + throw new RuntimeException("A ClassLoader could not be found"); + } + return cl; + } + public static T newInstance(Class type, Object... params) { return instantiate(type.getName(), findConstructor(type, params), params); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HeapByteBufAllocator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HeapByteBufAllocator.java new file mode 100644 index 00000000000..fd5ada50955 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HeapByteBufAllocator.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hbase.thirdparty.io.netty.buffer.AbstractByteBufAllocator; +import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; +import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator; + +/** + * A pooled ByteBufAllocator that does not prefer direct buffers regardless of platform settings. + *

+ * In some cases direct buffers are still required, like IO buffers where the buffer will be used in + * conjunction with a native method call, so we cannot force all buffer usage on heap. But we can + * strongly prefer it. + */ +@InterfaceAudience.Private +public class HeapByteBufAllocator extends AbstractByteBufAllocator { + + public static final HeapByteBufAllocator DEFAULT = new HeapByteBufAllocator(); + + private final PooledByteBufAllocator delegate = + new PooledByteBufAllocator(false /* preferDirect */); + + @Override + public boolean isDirectBufferPooled() { + return delegate.isDirectBufferPooled(); + } + + @Override + protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) { + return delegate.heapBuffer(initialCapacity, maxCapacity); + } + + @Override + protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { + return delegate.directBuffer(initialCapacity, maxCapacity); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java index 9c8319944e7..e2578ec1575 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java @@ -29,12 +29,16 @@ import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.security.HBasePolicyProvider; import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig; +import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.io.netty.bootstrap.ServerBootstrap; +import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator; +import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator; +import org.apache.hbase.thirdparty.io.netty.buffer.UnpooledByteBufAllocator; import org.apache.hbase.thirdparty.io.netty.channel.Channel; import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer; import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption; @@ -65,18 +69,35 @@ public class NettyRpcServer extends RpcServer { "hbase.netty.eventloop.rpcserver.thread.count"; private static final int EVENTLOOP_THREADCOUNT_DEFAULT = 0; + /** + * Name of property to change the byte buf allocator for the netty channels. Default is no value, + * which causes us to use PooledByteBufAllocator. Valid settings here are "pooled", "unpooled", + * and "heap", or, the name of a class implementing ByteBufAllocator. + *

+ * "pooled" and "unpooled" may prefer direct memory depending on netty configuration, which is + * controlled by platform specific code and documented system properties. + *

+ * "heap" will prefer heap arena allocations. + */ + public static final String HBASE_NETTY_ALLOCATOR_KEY = "hbase.netty.rpcserver.allocator"; + static final String POOLED_ALLOCATOR_TYPE = "pooled"; + static final String UNPOOLED_ALLOCATOR_TYPE = "unpooled"; + static final String HEAP_ALLOCATOR_TYPE = "heap"; + private final InetSocketAddress bindAddress; private final CountDownLatch closed = new CountDownLatch(1); private final Channel serverChannel; private final ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE, true); + private final ByteBufAllocator channelAllocator; public NettyRpcServer(Server server, String name, List services, InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler, boolean reservoirEnabled) throws IOException { super(server, name, services, bindAddress, conf, scheduler, reservoirEnabled); this.bindAddress = bindAddress; + this.channelAllocator = getChannelAllocator(conf); EventLoopGroup eventLoopGroup; Class channelClass; if (server instanceof HRegionServer) { @@ -97,9 +118,9 @@ public class NettyRpcServer extends RpcServer { .childOption(ChannelOption.SO_KEEPALIVE, tcpKeepAlive) .childOption(ChannelOption.SO_REUSEADDR, true) .childHandler(new ChannelInitializer() { - @Override protected void initChannel(Channel ch) throws Exception { + ch.config().setAllocator(channelAllocator); ChannelPipeline pipeline = ch.pipeline(); FixedLengthFrameDecoder preambleDecoder = new FixedLengthFrameDecoder(6); preambleDecoder.setSingleDecode(true); @@ -120,6 +141,36 @@ public class NettyRpcServer extends RpcServer { this.scheduler.init(new RpcSchedulerContext(this)); } + private ByteBufAllocator getChannelAllocator(Configuration conf) throws IOException { + final String value = conf.get(HBASE_NETTY_ALLOCATOR_KEY); + if (value != null) { + if (POOLED_ALLOCATOR_TYPE.equalsIgnoreCase(value)) { + LOG.info("Using {} for buffer allocation", PooledByteBufAllocator.class.getName()); + return PooledByteBufAllocator.DEFAULT; + } else if (UNPOOLED_ALLOCATOR_TYPE.equalsIgnoreCase(value)) { + LOG.info("Using {} for buffer allocation", UnpooledByteBufAllocator.class.getName()); + return UnpooledByteBufAllocator.DEFAULT; + } else if (HEAP_ALLOCATOR_TYPE.equalsIgnoreCase(value)) { + LOG.info("Using {} for buffer allocation", HeapByteBufAllocator.class.getName()); + return HeapByteBufAllocator.DEFAULT; + } else { + // If the value is none of the recognized labels, treat it as a class name. This allows the + // user to supply a custom implementation, perhaps for debugging. + try { + // ReflectionUtils throws UnsupportedOperationException if there are any problems. + ByteBufAllocator alloc = (ByteBufAllocator) ReflectionUtils.newInstance(value); + LOG.info("Using {} for buffer allocation", value); + return alloc; + } catch (ClassCastException | UnsupportedOperationException e) { + throw new IOException(e); + } + } + } else { + LOG.info("Using {} for buffer allocation", PooledByteBufAllocator.class.getName()); + return PooledByteBufAllocator.DEFAULT; + } + } + @InterfaceAudience.Private protected NettyRpcServerPreambleHandler createNettyRpcServerPreambleHandler() { return new NettyRpcServerPreambleHandler(NettyRpcServer.this); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/SimpleByteBufAllocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/SimpleByteBufAllocator.java new file mode 100644 index 00000000000..1796539eb0e --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/SimpleByteBufAllocator.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.io.netty.buffer.AbstractByteBufAllocator; +import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; +import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator; +import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled; + +/** + * A custom byte buf allocator for TestNettyRpcServer. + */ +public class SimpleByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocator { + + static final Logger LOG = LoggerFactory.getLogger(SimpleByteBufAllocator.class); + + @Override + public boolean isDirectBufferPooled() { + return false; + } + + @Override + protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) { + LOG.info("newHeapBuffer initialCapacity={}, maxCapacity={}", initialCapacity, maxCapacity); + return Unpooled.buffer(initialCapacity, maxCapacity); + } + + @Override + protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { + LOG.info("newDirectBuffer initialCapacity={}, maxCapacity={}", initialCapacity, maxCapacity); + return Unpooled.directBuffer(initialCapacity, maxCapacity); + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyRpcServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyRpcServer.java index b1db8778146..eefadaf528c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyRpcServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyRpcServer.java @@ -21,6 +21,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.List; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtil; @@ -33,16 +35,19 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RPCTests; import org.apache.hadoop.hbase.util.Bytes; -import org.junit.AfterClass; +import org.junit.After; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; @Category({ RPCTests.class, MediumTests.class }) +@RunWith(Parameterized.class) public class TestNettyRpcServer { @ClassRule @@ -57,22 +62,28 @@ public class TestNettyRpcServer { private static byte[] FAMILY = Bytes.toBytes("f1"); private static byte[] PRIVATE_COL = Bytes.toBytes("private"); private static byte[] PUBLIC_COL = Bytes.toBytes("public"); + @Parameterized.Parameter + public String allocatorType; - @Before - public void setup() { - TABLE = TableName.valueOf(name.getMethodName()); + @Parameters + public static Collection parameters() { + return Arrays.asList(new Object[][] { { NettyRpcServer.POOLED_ALLOCATOR_TYPE }, + { NettyRpcServer.UNPOOLED_ALLOCATOR_TYPE }, { NettyRpcServer.HEAP_ALLOCATOR_TYPE }, + { SimpleByteBufAllocator.class.getName() } }); } - @BeforeClass - public static void setupBeforeClass() throws Exception { + @Before + public void setup() throws Exception { + TABLE = TableName.valueOf(name.getMethodName().replace('[', '_').replace(']', '_')); TEST_UTIL = new HBaseTestingUtil(); TEST_UTIL.getConfiguration().set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, NettyRpcServer.class.getName()); + TEST_UTIL.getConfiguration().set(NettyRpcServer.HBASE_NETTY_ALLOCATOR_KEY, allocatorType); TEST_UTIL.startMiniCluster(); } - @AfterClass - public static void tearDownAfterClass() throws Exception { + @After + public void tearDown() throws Exception { TEST_UTIL.shutdownMiniCluster(); }