diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java index 4282a8faa49..91c468f197a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java @@ -42,6 +42,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.Server; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; @@ -59,7 +60,7 @@ import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; * An RPC server with Netty4 implementation. * @since 2.0.0 */ -@InterfaceAudience.Private +@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.CONFIG}) public class NettyRpcServer extends RpcServer { public static final Log LOG = LogFactory.getLog(NettyRpcServer.class); @@ -71,9 +72,9 @@ public class NettyRpcServer extends RpcServer { private final ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); public NettyRpcServer(Server server, String name, List services, - InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler) - throws IOException { - super(server, name, services, bindAddress, conf, scheduler); + InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler, + boolean reservoirEnabled) throws IOException { + super(server, name, services, bindAddress, conf, scheduler, reservoirEnabled); this.bindAddress = bindAddress; EventLoopGroup eventLoopGroup; Class channelClass; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index b1abb77682f..43af98836a1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -42,7 +42,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.conf.ConfigurationObserver; @@ -67,7 +66,6 @@ import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.yetus.audience.InterfaceStability; import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService; @@ -83,8 +81,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHea * An RPC server that hosts protobuf described Services. * */ -@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) -@InterfaceStability.Evolving +@InterfaceAudience.Private public abstract class RpcServer implements RpcServerInterface, ConfigurationObserver { // LOG is being used in CallRunner and the log level is being changed in tests @@ -255,13 +252,13 @@ public abstract class RpcServer implements RpcServerInterface, * @param bindAddress Where to listen * @param conf * @param scheduler + * @param reservoirEnabled Enable ByteBufferPool or not. */ public RpcServer(final Server server, final String name, final List services, final InetSocketAddress bindAddress, Configuration conf, - RpcScheduler scheduler) - throws IOException { - if (conf.getBoolean("hbase.ipc.server.reservoir.enabled", true)) { + RpcScheduler scheduler, boolean reservoirEnabled) throws IOException { + if (reservoirEnabled) { int poolBufSize = conf.getInt(ByteBufferPool.BUFFER_SIZE_KEY, ByteBufferPool.DEFAULT_BUFFER_SIZE); // The max number of buffers to be pooled in the ByteBufferPool. The default value been diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerFactory.java index 67ad666846a..b1b047d711a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerFactory.java @@ -43,10 +43,16 @@ public class RpcServerFactory { private RpcServerFactory() { } + public static RpcServer createRpcServer(final Server server, final String name, + final List services, final InetSocketAddress bindAddress, + Configuration conf, RpcScheduler scheduler) throws IOException { + return createRpcServer(server, name, services, bindAddress, conf, scheduler, true); + } + public static RpcServer createRpcServer(final Server server, final String name, final List services, final InetSocketAddress bindAddress, Configuration conf, - RpcScheduler scheduler) throws IOException { + RpcScheduler scheduler, boolean reservoirEnabled) throws IOException { String rpcServerClass = conf.get(CUSTOM_RPC_SERVER_IMPL_CONF_KEY, NettyRpcServer.class.getName()); StringBuilder servicesList = new StringBuilder(); @@ -59,7 +65,7 @@ public class RpcServerFactory { LOG.info("Creating " + rpcServerClass + " hosting " + servicesList); return ReflectionUtils.instantiateWithCustomCtor(rpcServerClass, new Class[] { Server.class, String.class, List.class, - InetSocketAddress.class, Configuration.class, RpcScheduler.class }, - new Object[] { server, name, services, bindAddress, conf, scheduler }); + InetSocketAddress.class, Configuration.class, RpcScheduler.class, boolean.class }, + new Object[] { server, name, services, bindAddress, conf, scheduler, reservoirEnabled }); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java index 730b82b98ce..d970adccd52 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java @@ -23,9 +23,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.yetus.audience.InterfaceStability; import org.apache.hadoop.hbase.CellScanner; -import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.regionserver.RSRpcServices; import org.apache.hadoop.hbase.util.Pair; @@ -35,10 +33,8 @@ import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTe import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService; import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor; import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; -@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) -@InterfaceStability.Evolving +@InterfaceAudience.Private public interface RpcServerInterface { void start(); boolean isStarted(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java index 36ae74a0608..d0d9abc50c8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java @@ -43,10 +43,10 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.yetus.audience.InterfaceStability; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.security.HBasePolicyProvider; import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService; @@ -81,8 +81,7 @@ import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFa * * @see BlockingRpcClient */ -@InterfaceAudience.Private -@InterfaceStability.Evolving +@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.CONFIG}) public class SimpleRpcServer extends RpcServer { protected int port; // port we listen on @@ -375,13 +374,13 @@ public class SimpleRpcServer extends RpcServer { * @param bindAddress Where to listen * @param conf * @param scheduler + * @param reservoirEnabled Enable ByteBufferPool or not. */ public SimpleRpcServer(final Server server, final String name, final List services, final InetSocketAddress bindAddress, Configuration conf, - RpcScheduler scheduler) - throws IOException { - super(server, name, services, bindAddress, conf, scheduler); + RpcScheduler scheduler, boolean reservoirEnabled) throws IOException { + super(server, name, services, bindAddress, conf, scheduler, reservoirEnabled); this.socketSendBufferSize = 0; this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size", 10); this.purgeTimeout = conf.getLong("hbase.ipc.client.call.purge.timeout", diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index ce85b66cb47..e0f2b27ffb1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -19,7 +19,9 @@ package org.apache.hadoop.hbase.master; import java.io.IOException; +import java.net.BindException; import java.net.InetAddress; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -31,10 +33,12 @@ import java.util.stream.Collectors; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerLoad; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; @@ -54,6 +58,8 @@ import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; import org.apache.hadoop.hbase.ipc.PriorityFunction; import org.apache.hadoop.hbase.ipc.QosPriority; import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; +import org.apache.hadoop.hbase.ipc.RpcServerFactory; +import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.master.assignment.RegionStates; import org.apache.hadoop.hbase.master.locking.LockProcedure; @@ -72,6 +78,7 @@ import org.apache.hadoop.hbase.quotas.QuotaObserverChore; import org.apache.hadoop.hbase.quotas.QuotaUtil; import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot; import org.apache.hadoop.hbase.regionserver.RSRpcServices; +import org.apache.hadoop.hbase.regionserver.RpcSchedulerFactory; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; @@ -311,6 +318,24 @@ public class MasterRpcServices extends RSRpcServices master = m; } + @Override + protected RpcServerInterface createRpcServer(Server server, Configuration conf, + RpcSchedulerFactory rpcSchedulerFactory, InetSocketAddress bindAddress, String name) + throws IOException { + // RpcServer at HM by default enable ByteBufferPool iff HM having user table region in it + boolean reservoirEnabled = conf.getBoolean(RESERVOIR_ENABLED_KEY, + (LoadBalancer.isTablesOnMaster(conf) && !LoadBalancer.isSystemTablesOnlyOnMaster(conf))); + try { + return RpcServerFactory.createRpcServer(server, name, getServices(), + bindAddress, // use final bindAddress for this server. + conf, rpcSchedulerFactory.create(conf, this, server), reservoirEnabled); + } catch (BindException be) { + throw new IOException(be.getMessage() + ". To switch ports use the '" + + HConstants.MASTER_PORT + "' configuration property.", + be.getCause() != null ? be.getCause() : be); + } + } + @Override protected PriorityFunction createPriority() { return new MasterAnnotationReadingPriorityFunction(this); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index b9d2557dd29..bc3dc6a2df2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -63,6 +63,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.MultiActionResultTooLarge; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.PrivateCellUtil; +import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.UnknownScannerException; @@ -260,6 +261,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, */ static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000; + protected static final String RESERVOIR_ENABLED_KEY = "hbase.ipc.server.reservoir.enabled"; + // Request counter. (Includes requests that are not serviced by regions.) // Count only once for requests with multiple actions like multi/caching-scan/replayBatch final LongAdder requestCount = new LongAdder(); @@ -1171,19 +1174,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, String name = rs.getProcessName() + "/" + initialIsa.toString(); // Set how many times to retry talking to another server over Connection. ConnectionUtils.setServerSideHConnectionRetriesConfig(rs.conf, name, LOG); - try { - rpcServer = RpcServerFactory.createRpcServer(rs, name, getServices(), - bindAddress, // use final bindAddress for this server. - rs.conf, - rpcSchedulerFactory.create(rs.conf, this, rs)); - rpcServer.setRsRpcServices(this); - } catch (BindException be) { - String configName = (this instanceof MasterRpcServices) ? HConstants.MASTER_PORT : - HConstants.REGIONSERVER_PORT; - throw new IOException(be.getMessage() + ". To switch ports use the '" + configName + - "' configuration property.", be.getCause() != null ? be.getCause() : be); - } - + rpcServer = createRpcServer(rs, rs.conf, rpcSchedulerFactory, bindAddress, name); + rpcServer.setRsRpcServices(this); scannerLeaseTimeoutPeriod = rs.conf.getInt( HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); @@ -1210,6 +1202,21 @@ public class RSRpcServices implements HBaseRPCErrorHandler, .expireAfterAccess(scannerLeaseTimeoutPeriod, TimeUnit.MILLISECONDS).build(); } + protected RpcServerInterface createRpcServer(Server server, Configuration conf, + RpcSchedulerFactory rpcSchedulerFactory, InetSocketAddress bindAddress, String name) + throws IOException { + boolean reservoirEnabled = conf.getBoolean(RESERVOIR_ENABLED_KEY, true); + try { + return RpcServerFactory.createRpcServer(server, name, getServices(), + bindAddress, // use final bindAddress for this server. + conf, rpcSchedulerFactory.create(conf, this, server), reservoirEnabled); + } catch (BindException be) { + throw new IOException(be.getMessage() + ". To switch ports use the '" + + HConstants.REGIONSERVER_PORT + "' configuration property.", + be.getCause() != null ? be.getCause() : be); + } + } + @Override public void onConfigurationChange(Configuration newConf) { if (rpcServer instanceof ConfigurationObserver) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java index 9c91d1d39fa..8fbdafd2d36 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java @@ -73,7 +73,7 @@ public class TestBlockingIPC extends AbstractTestIPC { TestFailingRpcServer(Server server, String name, List services, InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler) throws IOException { - super(server, name, services, bindAddress, conf, scheduler); + super(server, name, services, bindAddress, conf, scheduler, true); } final class FailingConnection extends SimpleServerRpcConnection { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java index aa534d46cc5..ef90bd6dfd9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java @@ -106,7 +106,7 @@ public class TestNettyIPC extends AbstractTestIPC { protected RpcServer createRpcServer(Server server, String name, List services, InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler) throws IOException { - return new NettyRpcServer(server, name, services, bindAddress, conf, scheduler); + return new NettyRpcServer(server, name, services, bindAddress, conf, scheduler, true); } @Override @@ -145,7 +145,7 @@ public class TestNettyIPC extends AbstractTestIPC { TestFailingRpcServer(Server server, String name, List services, InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler) throws IOException { - super(server, name, services, bindAddress, conf, scheduler); + super(server, name, services, bindAddress, conf, scheduler, true); } final class FailingConnection extends NettyServerRpcConnection {