HBASE-7008 Set scanner caching to a better default, disable Nagles
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1417233 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2e5618b053
commit
16084720bb
|
@ -548,6 +548,16 @@ public final class HConstants {
|
||||||
*/
|
*/
|
||||||
public static int DEFAULT_HBASE_CLIENT_PREFETCH_LIMIT = 10;
|
public static int DEFAULT_HBASE_CLIENT_PREFETCH_LIMIT = 10;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Parameter name to set the default scanner caching for all clients.
|
||||||
|
*/
|
||||||
|
public static String HBASE_CLIENT_SCANNER_CACHING = "hbase.client.scanner.caching";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Default value for {@link #HBASE_CLIENT_SCANNER_CACHING}
|
||||||
|
*/
|
||||||
|
public static int DEFAULT_HBASE_CLIENT_SCANNER_CACHING = 100;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Parameter name for number of rows that will be fetched when calling next on
|
* Parameter name for number of rows that will be fetched when calling next on
|
||||||
* a scanner if it is not served from memory. Higher caching values will
|
* a scanner if it is not served from memory. Higher caching values will
|
||||||
|
|
|
@ -121,7 +121,9 @@ public class ClientScanner extends AbstractClientScanner {
|
||||||
if (this.scan.getCaching() > 0) {
|
if (this.scan.getCaching() > 0) {
|
||||||
this.caching = this.scan.getCaching();
|
this.caching = this.scan.getCaching();
|
||||||
} else {
|
} else {
|
||||||
this.caching = conf.getInt("hbase.client.scanner.caching", 1);
|
this.caching = conf.getInt(
|
||||||
|
HConstants.HBASE_CLIENT_SCANNER_CACHING,
|
||||||
|
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
|
||||||
}
|
}
|
||||||
|
|
||||||
// initialize the scanner
|
// initialize the scanner
|
||||||
|
|
|
@ -258,7 +258,8 @@ public class HTable implements HTableInterface {
|
||||||
this.autoFlush = true;
|
this.autoFlush = true;
|
||||||
this.currentWriteBufferSize = 0;
|
this.currentWriteBufferSize = 0;
|
||||||
this.scannerCaching = this.configuration.getInt(
|
this.scannerCaching = this.configuration.getInt(
|
||||||
"hbase.client.scanner.caching", 1);
|
HConstants.HBASE_CLIENT_SCANNER_CACHING,
|
||||||
|
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
|
||||||
|
|
||||||
this.maxKeyValueSize = this.configuration.getInt(
|
this.maxKeyValueSize = this.configuration.getInt(
|
||||||
"hbase.client.keyvalue.maxsize", -1);
|
"hbase.client.keyvalue.maxsize", -1);
|
||||||
|
|
|
@ -1157,7 +1157,7 @@ public class HBaseClient {
|
||||||
conf.getInt("hbase.ipc.client.connection.maxidletime", 10000); //10s
|
conf.getInt("hbase.ipc.client.connection.maxidletime", 10000); //10s
|
||||||
this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0);
|
this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0);
|
||||||
this.failureSleep = conf.getInt("hbase.client.pause", 1000);
|
this.failureSleep = conf.getInt("hbase.client.pause", 1000);
|
||||||
this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", false);
|
this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", true);
|
||||||
this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true);
|
this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true);
|
||||||
this.pingInterval = getPingInterval(conf);
|
this.pingInterval = getPingInterval(conf);
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
|
|
|
@ -1938,7 +1938,7 @@ public abstract class HBaseServer implements RpcServer {
|
||||||
this.port = listener.getAddress().getPort();
|
this.port = listener.getAddress().getPort();
|
||||||
this.rpcMetrics = new HBaseRpcMetrics(
|
this.rpcMetrics = new HBaseRpcMetrics(
|
||||||
serverName, Integer.toString(this.port));
|
serverName, Integer.toString(this.port));
|
||||||
this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", false);
|
this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", true);
|
||||||
this.tcpKeepAlive = conf.getBoolean("ipc.server.tcpkeepalive", true);
|
this.tcpKeepAlive = conf.getBoolean("ipc.server.tcpkeepalive", true);
|
||||||
|
|
||||||
this.warnDelayedCalls = conf.getInt(WARN_DELAYED_CALLS,
|
this.warnDelayedCalls = conf.getInt(WARN_DELAYED_CALLS,
|
||||||
|
|
|
@ -133,7 +133,7 @@
|
||||||
</property>
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>hbase.client.scanner.caching</name>
|
<name>hbase.client.scanner.caching</name>
|
||||||
<value>1</value>
|
<value>100</value>
|
||||||
<description>Number of rows that will be fetched when calling next
|
<description>Number of rows that will be fetched when calling next
|
||||||
on a scanner if it is not served from (local, client) memory. Higher
|
on a scanner if it is not served from (local, client) memory. Higher
|
||||||
caching values will enable faster scanners but will eat up more memory
|
caching values will enable faster scanners but will eat up more memory
|
||||||
|
@ -528,6 +528,13 @@
|
||||||
used for client / server RPC call marshalling.
|
used for client / server RPC call marshalling.
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>hbase.ipc.client.tcpnodelay</name>
|
||||||
|
<value>true</value>
|
||||||
|
<description>Set no delay on rpc socket connections. See
|
||||||
|
http://docs.oracle.com/javase/1.5.0/docs/api/java/net/Socket.html#getTcpNoDelay()
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<!-- The following properties configure authentication information for
|
<!-- The following properties configure authentication information for
|
||||||
HBase processes when using Kerberos security. There are no default
|
HBase processes when using Kerberos security. There are no default
|
||||||
|
|
|
@ -4663,7 +4663,20 @@ public class TestFromClientSide {
|
||||||
// turn on scan metrics
|
// turn on scan metrics
|
||||||
Scan scan = new Scan();
|
Scan scan = new Scan();
|
||||||
scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE));
|
scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE));
|
||||||
|
scan.setCaching(numRecords+1);
|
||||||
ResultScanner scanner = ht.getScanner(scan);
|
ResultScanner scanner = ht.getScanner(scan);
|
||||||
|
for (Result result : scanner.next(numRecords - 1)) {
|
||||||
|
}
|
||||||
|
scanner.close();
|
||||||
|
// need to have at one next roundtrip in order to collect metrics
|
||||||
|
// here we have less than <numRecord>+1 KVs, so no metrics were collected
|
||||||
|
assertNull(scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA));
|
||||||
|
|
||||||
|
// set caching to 1, becasue metrics are collected in each roundtrip only
|
||||||
|
scan = new Scan();
|
||||||
|
scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE));
|
||||||
|
scan.setCaching(1);
|
||||||
|
scanner = ht.getScanner(scan);
|
||||||
// per HBASE-5717, this should still collect even if you don't run all the way to
|
// per HBASE-5717, this should still collect even if you don't run all the way to
|
||||||
// the end of the scanner. So this is asking for 2 of the 3 rows we inserted.
|
// the end of the scanner. So this is asking for 2 of the 3 rows we inserted.
|
||||||
for (Result result : scanner.next(numRecords - 1)) {
|
for (Result result : scanner.next(numRecords - 1)) {
|
||||||
|
@ -4677,6 +4690,7 @@ public class TestFromClientSide {
|
||||||
// now, test that the metrics are still collected even if you don't call close, but do
|
// now, test that the metrics are still collected even if you don't call close, but do
|
||||||
// run past the end of all the records
|
// run past the end of all the records
|
||||||
Scan scanWithoutClose = new Scan();
|
Scan scanWithoutClose = new Scan();
|
||||||
|
scanWithoutClose.setCaching(1);
|
||||||
scanWithoutClose.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE));
|
scanWithoutClose.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE));
|
||||||
ResultScanner scannerWithoutClose = ht.getScanner(scanWithoutClose);
|
ResultScanner scannerWithoutClose = ht.getScanner(scanWithoutClose);
|
||||||
for (Result result : scannerWithoutClose.next(numRecords + 1)) {
|
for (Result result : scannerWithoutClose.next(numRecords + 1)) {
|
||||||
|
@ -4688,6 +4702,8 @@ public class TestFromClientSide {
|
||||||
// finally, test that the metrics are collected correctly if you both run past all the records,
|
// finally, test that the metrics are collected correctly if you both run past all the records,
|
||||||
// AND close the scanner
|
// AND close the scanner
|
||||||
Scan scanWithClose = new Scan();
|
Scan scanWithClose = new Scan();
|
||||||
|
// make sure we can set caching up to the number of a scanned values
|
||||||
|
scanWithClose.setCaching(numRecords);
|
||||||
scanWithClose.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE));
|
scanWithClose.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE));
|
||||||
ResultScanner scannerWithClose = ht.getScanner(scanWithClose);
|
ResultScanner scannerWithClose = ht.getScanner(scanWithClose);
|
||||||
for (Result result : scannerWithClose.next(numRecords + 1)) {
|
for (Result result : scannerWithClose.next(numRecords + 1)) {
|
||||||
|
|
|
@ -98,6 +98,7 @@ public class TestScannerTimeout {
|
||||||
public void test2481() throws Exception {
|
public void test2481() throws Exception {
|
||||||
LOG.info("START ************ test2481");
|
LOG.info("START ************ test2481");
|
||||||
Scan scan = new Scan();
|
Scan scan = new Scan();
|
||||||
|
scan.setCaching(1);
|
||||||
HTable table =
|
HTable table =
|
||||||
new HTable(new Configuration(TEST_UTIL.getConfiguration()), TABLE_NAME);
|
new HTable(new Configuration(TEST_UTIL.getConfiguration()), TABLE_NAME);
|
||||||
ResultScanner r = table.getScanner(scan);
|
ResultScanner r = table.getScanner(scan);
|
||||||
|
|
Loading…
Reference in New Issue