HBASE-11492 Hadoop configuration overrides some ipc parameters including tcpNoDelay
This commit is contained in:
parent
c5dc88791b
commit
d8faab2fff
|
@ -181,14 +181,14 @@ possible configurations would overwhelm and obscure the important.
|
||||||
Same property is used by the Master for count of master handlers.</description>
|
Same property is used by the Master for count of master handlers.</description>
|
||||||
</property>
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>ipc.server.callqueue.handler.factor</name>
|
<name>hbase.ipc.server.callqueue.handler.factor</name>
|
||||||
<value>0.1</value>
|
<value>0.1</value>
|
||||||
<description>Factor to determine the number of call queues.
|
<description>Factor to determine the number of call queues.
|
||||||
A value of 0 means a single queue shared between all the handlers.
|
A value of 0 means a single queue shared between all the handlers.
|
||||||
A value of 1 means that each handler has its own queue.</description>
|
A value of 1 means that each handler has its own queue.</description>
|
||||||
</property>
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>ipc.server.callqueue.read.share</name>
|
<name>hbase.ipc.server.callqueue.read.share</name>
|
||||||
<value>0</value>
|
<value>0</value>
|
||||||
<description>Split the call queues into read and write queues.
|
<description>Split the call queues into read and write queues.
|
||||||
A value of 0 indicate to not split the call queues.
|
A value of 0 indicate to not split the call queues.
|
||||||
|
|
|
@ -39,7 +39,7 @@ public class FifoRpcScheduler extends RpcScheduler {
|
||||||
|
|
||||||
public FifoRpcScheduler(Configuration conf, int handlerCount) {
|
public FifoRpcScheduler(Configuration conf, int handlerCount) {
|
||||||
this.handlerCount = handlerCount;
|
this.handlerCount = handlerCount;
|
||||||
this.maxQueueLength = conf.getInt("ipc.server.max.callqueue.length",
|
this.maxQueueLength = conf.getInt("hbase.ipc.server.max.callqueue.length",
|
||||||
handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
|
handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -516,7 +516,7 @@ public class RpcServer implements RpcServerInterface {
|
||||||
|
|
||||||
public Listener(final String name) throws IOException {
|
public Listener(final String name) throws IOException {
|
||||||
super(name);
|
super(name);
|
||||||
backlogLength = conf.getInt("ipc.server.listen.queue.size", 128);
|
backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size", 128);
|
||||||
// Create a new server socket and set to non blocking mode
|
// Create a new server socket and set to non blocking mode
|
||||||
acceptChannel = ServerSocketChannel.open();
|
acceptChannel = ServerSocketChannel.open();
|
||||||
acceptChannel.configureBlocking(false);
|
acceptChannel.configureBlocking(false);
|
||||||
|
@ -1704,7 +1704,7 @@ public class RpcServer implements RpcServerInterface {
|
||||||
responder, totalRequestSize, null);
|
responder, totalRequestSize, null);
|
||||||
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
|
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
|
||||||
setupResponse(responseBuffer, callTooBig, new CallQueueTooBigException(),
|
setupResponse(responseBuffer, callTooBig, new CallQueueTooBigException(),
|
||||||
"Call queue is full, is ipc.server.max.callqueue.size too small?");
|
"Call queue is full, is hbase.ipc.server.max.callqueue.size too small?");
|
||||||
responder.doRespond(callTooBig);
|
responder.doRespond(callTooBig);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -1868,12 +1868,12 @@ public class RpcServer implements RpcServerInterface {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
this.socketSendBufferSize = 0;
|
this.socketSendBufferSize = 0;
|
||||||
this.maxQueueSize =
|
this.maxQueueSize =
|
||||||
this.conf.getInt("ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE);
|
this.conf.getInt("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE);
|
||||||
this.readThreads = conf.getInt("ipc.server.read.threadpool.size", 10);
|
this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size", 10);
|
||||||
this.maxIdleTime = 2 * conf.getInt("ipc.client.connection.maxidletime", 1000);
|
this.maxIdleTime = 2 * conf.getInt("hbase.ipc.client.connection.maxidletime", 1000);
|
||||||
this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
|
this.maxConnectionsToNuke = conf.getInt("hbase.ipc.client.kill.max", 10);
|
||||||
this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);
|
this.thresholdIdleConnections = conf.getInt("hbase.ipc.client.idlethreshold", 4000);
|
||||||
this.purgeTimeout = conf.getLong("ipc.client.call.purge.timeout",
|
this.purgeTimeout = conf.getLong("hbase.ipc.client.call.purge.timeout",
|
||||||
2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
|
2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
|
||||||
this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
|
this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
|
||||||
this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE);
|
this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE);
|
||||||
|
@ -1883,8 +1883,8 @@ public class RpcServer implements RpcServerInterface {
|
||||||
this.port = listener.getAddress().getPort();
|
this.port = listener.getAddress().getPort();
|
||||||
|
|
||||||
this.metrics = new MetricsHBaseServer(name, new MetricsHBaseServerWrapperImpl(this));
|
this.metrics = new MetricsHBaseServer(name, new MetricsHBaseServerWrapperImpl(this));
|
||||||
this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", true);
|
this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true);
|
||||||
this.tcpKeepAlive = conf.getBoolean("ipc.server.tcpkeepalive", true);
|
this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive", true);
|
||||||
|
|
||||||
this.warnDelayedCalls = conf.getInt(WARN_DELAYED_CALLS, DEFAULT_WARN_DELAYED_CALLS);
|
this.warnDelayedCalls = conf.getInt(WARN_DELAYED_CALLS, DEFAULT_WARN_DELAYED_CALLS);
|
||||||
this.delayedCalls = new AtomicInteger(0);
|
this.delayedCalls = new AtomicInteger(0);
|
||||||
|
|
|
@ -38,17 +38,18 @@ import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
|
||||||
public class SimpleRpcScheduler extends RpcScheduler {
|
public class SimpleRpcScheduler extends RpcScheduler {
|
||||||
public static final Log LOG = LogFactory.getLog(SimpleRpcScheduler.class);
|
public static final Log LOG = LogFactory.getLog(SimpleRpcScheduler.class);
|
||||||
|
|
||||||
public static final String CALL_QUEUE_READ_SHARE_CONF_KEY = "ipc.server.callqueue.read.share";
|
public static final String CALL_QUEUE_READ_SHARE_CONF_KEY = "hbase.ipc.server.callqueue.read.share";
|
||||||
public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY =
|
public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY =
|
||||||
"ipc.server.callqueue.handler.factor";
|
"hbase.ipc.server.callqueue.handler.factor";
|
||||||
|
|
||||||
/** If set to 'deadline', uses a priority queue and deprioritize long-running scans */
|
/** If set to 'deadline', uses a priority queue and deprioritize long-running scans */
|
||||||
public static final String CALL_QUEUE_TYPE_CONF_KEY = "ipc.server.callqueue.type";
|
public static final String CALL_QUEUE_TYPE_CONF_KEY = "hbase.ipc.server.callqueue.type";
|
||||||
public static final String CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE = "deadline";
|
public static final String CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE = "deadline";
|
||||||
public static final String CALL_QUEUE_TYPE_FIFO_CONF_VALUE = "fifo";
|
public static final String CALL_QUEUE_TYPE_FIFO_CONF_VALUE = "fifo";
|
||||||
|
|
||||||
/** max delay in msec used to bound the deprioritized requests */
|
/** max delay in msec used to bound the deprioritized requests */
|
||||||
public static final String QUEUE_MAX_CALL_DELAY_CONF_KEY = "ipc.server.queue.max.call.delay";
|
public static final String QUEUE_MAX_CALL_DELAY_CONF_KEY
|
||||||
|
= "hbase.ipc.server.queue.max.call.delay";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Comparator used by the "normal callQueue" if DEADLINE_CALL_QUEUE_CONF_KEY is set to true.
|
* Comparator used by the "normal callQueue" if DEADLINE_CALL_QUEUE_CONF_KEY is set to true.
|
||||||
|
@ -104,7 +105,7 @@ public class SimpleRpcScheduler extends RpcScheduler {
|
||||||
int replicationHandlerCount,
|
int replicationHandlerCount,
|
||||||
PriorityFunction priority,
|
PriorityFunction priority,
|
||||||
int highPriorityLevel) {
|
int highPriorityLevel) {
|
||||||
int maxQueueLength = conf.getInt("ipc.server.max.callqueue.length",
|
int maxQueueLength = conf.getInt("hbase.ipc.server.max.callqueue.length",
|
||||||
handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
|
handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
|
||||||
this.priority = priority;
|
this.priority = priority;
|
||||||
this.highPriorityLevel = highPriorityLevel;
|
this.highPriorityLevel = highPriorityLevel;
|
||||||
|
|
|
@ -71,7 +71,7 @@ class AnnotationReadingPriorityFunction implements PriorityFunction {
|
||||||
LogFactory.getLog(AnnotationReadingPriorityFunction.class.getName());
|
LogFactory.getLog(AnnotationReadingPriorityFunction.class.getName());
|
||||||
|
|
||||||
/** Used to control the scan delay, currently sqrt(numNextCall * weight) */
|
/** Used to control the scan delay, currently sqrt(numNextCall * weight) */
|
||||||
public static final String SCAN_VTIME_WEIGHT_CONF_KEY = "ipc.server.scan.vtime.weight";
|
public static final String SCAN_VTIME_WEIGHT_CONF_KEY = "hbase.ipc.server.scan.vtime.weight";
|
||||||
|
|
||||||
private final Map<String, Integer> annotatedQos;
|
private final Map<String, Integer> annotatedQos;
|
||||||
//We need to mock the regionserver instance for some unit tests (set via
|
//We need to mock the regionserver instance for some unit tests (set via
|
||||||
|
|
|
@ -614,7 +614,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
+ busyWaitDuration + ") or hbase.busy.wait.multiplier.max ("
|
+ busyWaitDuration + ") or hbase.busy.wait.multiplier.max ("
|
||||||
+ maxBusyWaitMultiplier + "). Their product should be positive");
|
+ maxBusyWaitMultiplier + "). Their product should be positive");
|
||||||
}
|
}
|
||||||
this.maxBusyWaitDuration = conf.getLong("ipc.client.call.purge.timeout",
|
this.maxBusyWaitDuration = conf.getLong("hbase.ipc.client.call.purge.timeout",
|
||||||
2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
|
2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -52,7 +52,7 @@ public class TestFullLogReconstruction {
|
||||||
c.setInt("dfs.heartbeat.interval", 1);
|
c.setInt("dfs.heartbeat.interval", 1);
|
||||||
c.setInt("dfs.client.socket-timeout", 5000);
|
c.setInt("dfs.client.socket-timeout", 5000);
|
||||||
// faster failover with cluster.shutdown();fs.close() idiom
|
// faster failover with cluster.shutdown();fs.close() idiom
|
||||||
c.setInt("ipc.client.connect.max.retries", 1);
|
c.setInt("hbase.ipc.client.connect.max.retries", 1);
|
||||||
c.setInt("dfs.client.block.recovery.retries", 1);
|
c.setInt("dfs.client.block.recovery.retries", 1);
|
||||||
c.setInt(HConstants.ZK_SESSION_TIMEOUT, 1000);
|
c.setInt(HConstants.ZK_SESSION_TIMEOUT, 1000);
|
||||||
TEST_UTIL.startMiniCluster(3);
|
TEST_UTIL.startMiniCluster(3);
|
||||||
|
|
|
@ -135,11 +135,11 @@ public class TestHLog {
|
||||||
|
|
||||||
// faster failover with cluster.shutdown();fs.close() idiom
|
// faster failover with cluster.shutdown();fs.close() idiom
|
||||||
TEST_UTIL.getConfiguration()
|
TEST_UTIL.getConfiguration()
|
||||||
.setInt("ipc.client.connect.max.retries", 1);
|
.setInt("hbase.ipc.client.connect.max.retries", 1);
|
||||||
TEST_UTIL.getConfiguration().setInt(
|
TEST_UTIL.getConfiguration().setInt(
|
||||||
"dfs.client.block.recovery.retries", 1);
|
"dfs.client.block.recovery.retries", 1);
|
||||||
TEST_UTIL.getConfiguration().setInt(
|
TEST_UTIL.getConfiguration().setInt(
|
||||||
"ipc.client.connection.maxidletime", 500);
|
"hbase.ipc.client.connection.maxidletime", 500);
|
||||||
TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
|
TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
|
||||||
SampleRegionWALObserver.class.getName());
|
SampleRegionWALObserver.class.getName());
|
||||||
TEST_UTIL.startMiniDFSCluster(3);
|
TEST_UTIL.startMiniDFSCluster(3);
|
||||||
|
|
|
@ -121,7 +121,6 @@ public class TestLogRolling {
|
||||||
TEST_UTIL.getConfiguration().setInt("hbase.regionserver.maxlogentries", 32);
|
TEST_UTIL.getConfiguration().setInt("hbase.regionserver.maxlogentries", 32);
|
||||||
|
|
||||||
TEST_UTIL.getConfiguration().setInt("hbase.regionserver.logroll.errors.tolerated", 2);
|
TEST_UTIL.getConfiguration().setInt("hbase.regionserver.logroll.errors.tolerated", 2);
|
||||||
TEST_UTIL.getConfiguration().setInt("ipc.socket.timeout", 10 * 1000);
|
|
||||||
TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 10 * 1000);
|
TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 10 * 1000);
|
||||||
|
|
||||||
// For less frequently updated regions flush after every 2 flushes
|
// For less frequently updated regions flush after every 2 flushes
|
||||||
|
|
|
@ -36,7 +36,7 @@ module Hbase
|
||||||
self.configuration = org.apache.hadoop.hbase.HBaseConfiguration.create
|
self.configuration = org.apache.hadoop.hbase.HBaseConfiguration.create
|
||||||
# Turn off retries in hbase and ipc. Human doesn't want to wait on N retries.
|
# Turn off retries in hbase and ipc. Human doesn't want to wait on N retries.
|
||||||
configuration.setInt("hbase.client.retries.number", 7)
|
configuration.setInt("hbase.client.retries.number", 7)
|
||||||
configuration.setInt("ipc.client.connect.max.retries", 3)
|
configuration.setInt("hbase.ipc.client.connect.max.retries", 3)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -1309,7 +1309,7 @@ index e70ebc6..96f8c27 100644
|
||||||
requests to user tables. The rule of thumb is to keep this number low when the payload per
|
requests to user tables. The rule of thumb is to keep this number low when the payload per
|
||||||
request approaches the MB (big puts, scans using a large cache) and high when the payload
|
request approaches the MB (big puts, scans using a large cache) and high when the payload
|
||||||
is small (gets, small puts, ICVs, deletes). The total size of the queries in progress is
|
is small (gets, small puts, ICVs, deletes). The total size of the queries in progress is
|
||||||
limited by the setting "ipc.server.max.callqueue.size". </para>
|
limited by the setting "hbase.ipc.server.max.callqueue.size". </para>
|
||||||
<para> It is safe to set that number to the maximum number of incoming clients if their
|
<para> It is safe to set that number to the maximum number of incoming clients if their
|
||||||
payload is small, the typical example being a cluster that serves a website since puts
|
payload is small, the typical example being a cluster that serves a website since puts
|
||||||
aren't typically buffered and most of the operations are gets. </para>
|
aren't typically buffered and most of the operations are gets. </para>
|
||||||
|
|
Loading…
Reference in New Issue