HBASE-19505 Disable ByteBufferPool by default at HM.
This commit is contained in:
parent
1d8a601166
commit
a2dbb42284
|
@ -42,6 +42,7 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.CellScanner;
|
import org.apache.hadoop.hbase.CellScanner;
|
||||||
|
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.Server;
|
import org.apache.hadoop.hbase.Server;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
|
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
|
||||||
|
@ -59,7 +60,7 @@ import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
|
||||||
* An RPC server with Netty4 implementation.
|
* An RPC server with Netty4 implementation.
|
||||||
* @since 2.0.0
|
* @since 2.0.0
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.CONFIG})
|
||||||
public class NettyRpcServer extends RpcServer {
|
public class NettyRpcServer extends RpcServer {
|
||||||
|
|
||||||
public static final Log LOG = LogFactory.getLog(NettyRpcServer.class);
|
public static final Log LOG = LogFactory.getLog(NettyRpcServer.class);
|
||||||
|
@ -71,9 +72,9 @@ public class NettyRpcServer extends RpcServer {
|
||||||
private final ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
|
private final ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
|
||||||
|
|
||||||
public NettyRpcServer(Server server, String name, List<BlockingServiceAndInterface> services,
|
public NettyRpcServer(Server server, String name, List<BlockingServiceAndInterface> services,
|
||||||
InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler)
|
InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler,
|
||||||
throws IOException {
|
boolean reservoirEnabled) throws IOException {
|
||||||
super(server, name, services, bindAddress, conf, scheduler);
|
super(server, name, services, bindAddress, conf, scheduler, reservoirEnabled);
|
||||||
this.bindAddress = bindAddress;
|
this.bindAddress = bindAddress;
|
||||||
EventLoopGroup eventLoopGroup;
|
EventLoopGroup eventLoopGroup;
|
||||||
Class<? extends ServerChannel> channelClass;
|
Class<? extends ServerChannel> channelClass;
|
||||||
|
|
|
@ -42,7 +42,6 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.CallQueueTooBigException;
|
import org.apache.hadoop.hbase.CallQueueTooBigException;
|
||||||
import org.apache.hadoop.hbase.CellScanner;
|
import org.apache.hadoop.hbase.CellScanner;
|
||||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.Server;
|
import org.apache.hadoop.hbase.Server;
|
||||||
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
|
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
|
||||||
|
@ -67,7 +66,6 @@ import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
|
||||||
import org.apache.hadoop.security.token.SecretManager;
|
import org.apache.hadoop.security.token.SecretManager;
|
||||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.apache.yetus.audience.InterfaceStability;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
|
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
|
import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
|
||||||
|
@ -83,8 +81,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHea
|
||||||
* An RPC server that hosts protobuf described Services.
|
* An RPC server that hosts protobuf described Services.
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
|
@InterfaceAudience.Private
|
||||||
@InterfaceStability.Evolving
|
|
||||||
public abstract class RpcServer implements RpcServerInterface,
|
public abstract class RpcServer implements RpcServerInterface,
|
||||||
ConfigurationObserver {
|
ConfigurationObserver {
|
||||||
// LOG is being used in CallRunner and the log level is being changed in tests
|
// LOG is being used in CallRunner and the log level is being changed in tests
|
||||||
|
@ -255,13 +252,13 @@ public abstract class RpcServer implements RpcServerInterface,
|
||||||
* @param bindAddress Where to listen
|
* @param bindAddress Where to listen
|
||||||
* @param conf
|
* @param conf
|
||||||
* @param scheduler
|
* @param scheduler
|
||||||
|
* @param reservoirEnabled Enable ByteBufferPool or not.
|
||||||
*/
|
*/
|
||||||
public RpcServer(final Server server, final String name,
|
public RpcServer(final Server server, final String name,
|
||||||
final List<BlockingServiceAndInterface> services,
|
final List<BlockingServiceAndInterface> services,
|
||||||
final InetSocketAddress bindAddress, Configuration conf,
|
final InetSocketAddress bindAddress, Configuration conf,
|
||||||
RpcScheduler scheduler)
|
RpcScheduler scheduler, boolean reservoirEnabled) throws IOException {
|
||||||
throws IOException {
|
if (reservoirEnabled) {
|
||||||
if (conf.getBoolean("hbase.ipc.server.reservoir.enabled", true)) {
|
|
||||||
int poolBufSize = conf.getInt(ByteBufferPool.BUFFER_SIZE_KEY,
|
int poolBufSize = conf.getInt(ByteBufferPool.BUFFER_SIZE_KEY,
|
||||||
ByteBufferPool.DEFAULT_BUFFER_SIZE);
|
ByteBufferPool.DEFAULT_BUFFER_SIZE);
|
||||||
// The max number of buffers to be pooled in the ByteBufferPool. The default value been
|
// The max number of buffers to be pooled in the ByteBufferPool. The default value been
|
||||||
|
|
|
@ -43,10 +43,16 @@ public class RpcServerFactory {
|
||||||
private RpcServerFactory() {
|
private RpcServerFactory() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static RpcServer createRpcServer(final Server server, final String name,
|
||||||
|
final List<BlockingServiceAndInterface> services, final InetSocketAddress bindAddress,
|
||||||
|
Configuration conf, RpcScheduler scheduler) throws IOException {
|
||||||
|
return createRpcServer(server, name, services, bindAddress, conf, scheduler, true);
|
||||||
|
}
|
||||||
|
|
||||||
public static RpcServer createRpcServer(final Server server, final String name,
|
public static RpcServer createRpcServer(final Server server, final String name,
|
||||||
final List<BlockingServiceAndInterface> services,
|
final List<BlockingServiceAndInterface> services,
|
||||||
final InetSocketAddress bindAddress, Configuration conf,
|
final InetSocketAddress bindAddress, Configuration conf,
|
||||||
RpcScheduler scheduler) throws IOException {
|
RpcScheduler scheduler, boolean reservoirEnabled) throws IOException {
|
||||||
String rpcServerClass = conf.get(CUSTOM_RPC_SERVER_IMPL_CONF_KEY,
|
String rpcServerClass = conf.get(CUSTOM_RPC_SERVER_IMPL_CONF_KEY,
|
||||||
NettyRpcServer.class.getName());
|
NettyRpcServer.class.getName());
|
||||||
StringBuilder servicesList = new StringBuilder();
|
StringBuilder servicesList = new StringBuilder();
|
||||||
|
@ -59,7 +65,7 @@ public class RpcServerFactory {
|
||||||
LOG.info("Creating " + rpcServerClass + " hosting " + servicesList);
|
LOG.info("Creating " + rpcServerClass + " hosting " + servicesList);
|
||||||
return ReflectionUtils.instantiateWithCustomCtor(rpcServerClass,
|
return ReflectionUtils.instantiateWithCustomCtor(rpcServerClass,
|
||||||
new Class[] { Server.class, String.class, List.class,
|
new Class[] { Server.class, String.class, List.class,
|
||||||
InetSocketAddress.class, Configuration.class, RpcScheduler.class },
|
InetSocketAddress.class, Configuration.class, RpcScheduler.class, boolean.class },
|
||||||
new Object[] { server, name, services, bindAddress, conf, scheduler });
|
new Object[] { server, name, services, bindAddress, conf, scheduler, reservoirEnabled });
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,9 +23,7 @@ import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.apache.yetus.audience.InterfaceStability;
|
|
||||||
import org.apache.hadoop.hbase.CellScanner;
|
import org.apache.hadoop.hbase.CellScanner;
|
||||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
|
||||||
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
|
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
|
||||||
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
|
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
|
@ -35,10 +33,8 @@ import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTe
|
||||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
|
import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
|
||||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
|
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
|
||||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
|
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
|
||||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
|
|
||||||
|
|
||||||
@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
|
@InterfaceAudience.Private
|
||||||
@InterfaceStability.Evolving
|
|
||||||
public interface RpcServerInterface {
|
public interface RpcServerInterface {
|
||||||
void start();
|
void start();
|
||||||
boolean isStarted();
|
boolean isStarted();
|
||||||
|
|
|
@ -43,10 +43,10 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.CellScanner;
|
import org.apache.hadoop.hbase.CellScanner;
|
||||||
|
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.Server;
|
import org.apache.hadoop.hbase.Server;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.apache.yetus.audience.InterfaceStability;
|
|
||||||
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
|
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
|
||||||
import org.apache.hadoop.hbase.security.HBasePolicyProvider;
|
import org.apache.hadoop.hbase.security.HBasePolicyProvider;
|
||||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
|
import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
|
||||||
|
@ -81,8 +81,7 @@ import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFa
|
||||||
*
|
*
|
||||||
* @see BlockingRpcClient
|
* @see BlockingRpcClient
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.CONFIG})
|
||||||
@InterfaceStability.Evolving
|
|
||||||
public class SimpleRpcServer extends RpcServer {
|
public class SimpleRpcServer extends RpcServer {
|
||||||
|
|
||||||
protected int port; // port we listen on
|
protected int port; // port we listen on
|
||||||
|
@ -375,13 +374,13 @@ public class SimpleRpcServer extends RpcServer {
|
||||||
* @param bindAddress Where to listen
|
* @param bindAddress Where to listen
|
||||||
* @param conf
|
* @param conf
|
||||||
* @param scheduler
|
* @param scheduler
|
||||||
|
* @param reservoirEnabled Enable ByteBufferPool or not.
|
||||||
*/
|
*/
|
||||||
public SimpleRpcServer(final Server server, final String name,
|
public SimpleRpcServer(final Server server, final String name,
|
||||||
final List<BlockingServiceAndInterface> services,
|
final List<BlockingServiceAndInterface> services,
|
||||||
final InetSocketAddress bindAddress, Configuration conf,
|
final InetSocketAddress bindAddress, Configuration conf,
|
||||||
RpcScheduler scheduler)
|
RpcScheduler scheduler, boolean reservoirEnabled) throws IOException {
|
||||||
throws IOException {
|
super(server, name, services, bindAddress, conf, scheduler, reservoirEnabled);
|
||||||
super(server, name, services, bindAddress, conf, scheduler);
|
|
||||||
this.socketSendBufferSize = 0;
|
this.socketSendBufferSize = 0;
|
||||||
this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size", 10);
|
this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size", 10);
|
||||||
this.purgeTimeout = conf.getLong("hbase.ipc.client.call.purge.timeout",
|
this.purgeTimeout = conf.getLong("hbase.ipc.client.call.purge.timeout",
|
||||||
|
|
|
@ -19,7 +19,9 @@
|
||||||
package org.apache.hadoop.hbase.master;
|
package org.apache.hadoop.hbase.master;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.net.BindException;
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
@ -31,10 +33,12 @@ import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.MetaTableAccessor;
|
import org.apache.hadoop.hbase.MetaTableAccessor;
|
||||||
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.Server;
|
||||||
import org.apache.hadoop.hbase.ServerLoad;
|
import org.apache.hadoop.hbase.ServerLoad;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
@ -54,6 +58,8 @@ import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
|
||||||
import org.apache.hadoop.hbase.ipc.PriorityFunction;
|
import org.apache.hadoop.hbase.ipc.PriorityFunction;
|
||||||
import org.apache.hadoop.hbase.ipc.QosPriority;
|
import org.apache.hadoop.hbase.ipc.QosPriority;
|
||||||
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
|
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
|
||||||
|
import org.apache.hadoop.hbase.ipc.RpcServerFactory;
|
||||||
|
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
|
||||||
import org.apache.hadoop.hbase.ipc.ServerRpcController;
|
import org.apache.hadoop.hbase.ipc.ServerRpcController;
|
||||||
import org.apache.hadoop.hbase.master.assignment.RegionStates;
|
import org.apache.hadoop.hbase.master.assignment.RegionStates;
|
||||||
import org.apache.hadoop.hbase.master.locking.LockProcedure;
|
import org.apache.hadoop.hbase.master.locking.LockProcedure;
|
||||||
|
@ -72,6 +78,7 @@ import org.apache.hadoop.hbase.quotas.QuotaObserverChore;
|
||||||
import org.apache.hadoop.hbase.quotas.QuotaUtil;
|
import org.apache.hadoop.hbase.quotas.QuotaUtil;
|
||||||
import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
|
import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
|
||||||
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
|
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.RpcSchedulerFactory;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
|
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
|
||||||
|
@ -311,6 +318,24 @@ public class MasterRpcServices extends RSRpcServices
|
||||||
master = m;
|
master = m;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected RpcServerInterface createRpcServer(Server server, Configuration conf,
|
||||||
|
RpcSchedulerFactory rpcSchedulerFactory, InetSocketAddress bindAddress, String name)
|
||||||
|
throws IOException {
|
||||||
|
// RpcServer at HM by default enable ByteBufferPool iff HM having user table region in it
|
||||||
|
boolean reservoirEnabled = conf.getBoolean(RESERVOIR_ENABLED_KEY,
|
||||||
|
(LoadBalancer.isTablesOnMaster(conf) && !LoadBalancer.isSystemTablesOnlyOnMaster(conf)));
|
||||||
|
try {
|
||||||
|
return RpcServerFactory.createRpcServer(server, name, getServices(),
|
||||||
|
bindAddress, // use final bindAddress for this server.
|
||||||
|
conf, rpcSchedulerFactory.create(conf, this, server), reservoirEnabled);
|
||||||
|
} catch (BindException be) {
|
||||||
|
throw new IOException(be.getMessage() + ". To switch ports use the '"
|
||||||
|
+ HConstants.MASTER_PORT + "' configuration property.",
|
||||||
|
be.getCause() != null ? be.getCause() : be);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected PriorityFunction createPriority() {
|
protected PriorityFunction createPriority() {
|
||||||
return new MasterAnnotationReadingPriorityFunction(this);
|
return new MasterAnnotationReadingPriorityFunction(this);
|
||||||
|
|
|
@ -63,6 +63,7 @@ import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.MultiActionResultTooLarge;
|
import org.apache.hadoop.hbase.MultiActionResultTooLarge;
|
||||||
import org.apache.hadoop.hbase.NotServingRegionException;
|
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||||
import org.apache.hadoop.hbase.PrivateCellUtil;
|
import org.apache.hadoop.hbase.PrivateCellUtil;
|
||||||
|
import org.apache.hadoop.hbase.Server;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.UnknownScannerException;
|
import org.apache.hadoop.hbase.UnknownScannerException;
|
||||||
|
@ -260,6 +261,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
*/
|
*/
|
||||||
static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000;
|
static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000;
|
||||||
|
|
||||||
|
protected static final String RESERVOIR_ENABLED_KEY = "hbase.ipc.server.reservoir.enabled";
|
||||||
|
|
||||||
// Request counter. (Includes requests that are not serviced by regions.)
|
// Request counter. (Includes requests that are not serviced by regions.)
|
||||||
// Count only once for requests with multiple actions like multi/caching-scan/replayBatch
|
// Count only once for requests with multiple actions like multi/caching-scan/replayBatch
|
||||||
final LongAdder requestCount = new LongAdder();
|
final LongAdder requestCount = new LongAdder();
|
||||||
|
@ -1171,19 +1174,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
String name = rs.getProcessName() + "/" + initialIsa.toString();
|
String name = rs.getProcessName() + "/" + initialIsa.toString();
|
||||||
// Set how many times to retry talking to another server over Connection.
|
// Set how many times to retry talking to another server over Connection.
|
||||||
ConnectionUtils.setServerSideHConnectionRetriesConfig(rs.conf, name, LOG);
|
ConnectionUtils.setServerSideHConnectionRetriesConfig(rs.conf, name, LOG);
|
||||||
try {
|
rpcServer = createRpcServer(rs, rs.conf, rpcSchedulerFactory, bindAddress, name);
|
||||||
rpcServer = RpcServerFactory.createRpcServer(rs, name, getServices(),
|
rpcServer.setRsRpcServices(this);
|
||||||
bindAddress, // use final bindAddress for this server.
|
|
||||||
rs.conf,
|
|
||||||
rpcSchedulerFactory.create(rs.conf, this, rs));
|
|
||||||
rpcServer.setRsRpcServices(this);
|
|
||||||
} catch (BindException be) {
|
|
||||||
String configName = (this instanceof MasterRpcServices) ? HConstants.MASTER_PORT :
|
|
||||||
HConstants.REGIONSERVER_PORT;
|
|
||||||
throw new IOException(be.getMessage() + ". To switch ports use the '" + configName +
|
|
||||||
"' configuration property.", be.getCause() != null ? be.getCause() : be);
|
|
||||||
}
|
|
||||||
|
|
||||||
scannerLeaseTimeoutPeriod = rs.conf.getInt(
|
scannerLeaseTimeoutPeriod = rs.conf.getInt(
|
||||||
HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
|
HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
|
||||||
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
|
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
|
||||||
|
@ -1210,6 +1202,21 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
.expireAfterAccess(scannerLeaseTimeoutPeriod, TimeUnit.MILLISECONDS).build();
|
.expireAfterAccess(scannerLeaseTimeoutPeriod, TimeUnit.MILLISECONDS).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected RpcServerInterface createRpcServer(Server server, Configuration conf,
|
||||||
|
RpcSchedulerFactory rpcSchedulerFactory, InetSocketAddress bindAddress, String name)
|
||||||
|
throws IOException {
|
||||||
|
boolean reservoirEnabled = conf.getBoolean(RESERVOIR_ENABLED_KEY, true);
|
||||||
|
try {
|
||||||
|
return RpcServerFactory.createRpcServer(server, name, getServices(),
|
||||||
|
bindAddress, // use final bindAddress for this server.
|
||||||
|
conf, rpcSchedulerFactory.create(conf, this, server), reservoirEnabled);
|
||||||
|
} catch (BindException be) {
|
||||||
|
throw new IOException(be.getMessage() + ". To switch ports use the '"
|
||||||
|
+ HConstants.REGIONSERVER_PORT + "' configuration property.",
|
||||||
|
be.getCause() != null ? be.getCause() : be);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onConfigurationChange(Configuration newConf) {
|
public void onConfigurationChange(Configuration newConf) {
|
||||||
if (rpcServer instanceof ConfigurationObserver) {
|
if (rpcServer instanceof ConfigurationObserver) {
|
||||||
|
|
|
@ -73,7 +73,7 @@ public class TestBlockingIPC extends AbstractTestIPC {
|
||||||
TestFailingRpcServer(Server server, String name,
|
TestFailingRpcServer(Server server, String name,
|
||||||
List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
|
List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
|
||||||
Configuration conf, RpcScheduler scheduler) throws IOException {
|
Configuration conf, RpcScheduler scheduler) throws IOException {
|
||||||
super(server, name, services, bindAddress, conf, scheduler);
|
super(server, name, services, bindAddress, conf, scheduler, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
final class FailingConnection extends SimpleServerRpcConnection {
|
final class FailingConnection extends SimpleServerRpcConnection {
|
||||||
|
|
|
@ -106,7 +106,7 @@ public class TestNettyIPC extends AbstractTestIPC {
|
||||||
protected RpcServer createRpcServer(Server server, String name,
|
protected RpcServer createRpcServer(Server server, String name,
|
||||||
List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
|
List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
|
||||||
Configuration conf, RpcScheduler scheduler) throws IOException {
|
Configuration conf, RpcScheduler scheduler) throws IOException {
|
||||||
return new NettyRpcServer(server, name, services, bindAddress, conf, scheduler);
|
return new NettyRpcServer(server, name, services, bindAddress, conf, scheduler, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -145,7 +145,7 @@ public class TestNettyIPC extends AbstractTestIPC {
|
||||||
TestFailingRpcServer(Server server, String name,
|
TestFailingRpcServer(Server server, String name,
|
||||||
List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
|
List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
|
||||||
Configuration conf, RpcScheduler scheduler) throws IOException {
|
Configuration conf, RpcScheduler scheduler) throws IOException {
|
||||||
super(server, name, services, bindAddress, conf, scheduler);
|
super(server, name, services, bindAddress, conf, scheduler, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
final class FailingConnection extends NettyServerRpcConnection {
|
final class FailingConnection extends NettyServerRpcConnection {
|
||||||
|
|
Loading…
Reference in New Issue