HBASE-13527 The default value for hbase.client.scanner.max.result.size is never actually set on Scans

Signed-off-by: stack <stack@apache.org>
This commit is contained in:
Jonathan Lawlor 2015-04-21 17:59:09 -07:00 committed by stack
parent 30880bca55
commit c8d8499da8
4 changed files with 70 additions and 1 deletions

View File

@ -209,6 +209,11 @@ public class ClientScanner extends AbstractClientScanner {
return lastNext; return lastNext;
} }
@VisibleForTesting
protected long getMaxResultSize() {
return maxScannerResultSize;
}
// returns true if the passed region endKey // returns true if the passed region endKey
protected boolean checkScanStopRow(final byte [] endKey) { protected boolean checkScanStopRow(final byte [] endKey) {
if (this.scan.getStopRow().length > 0) { if (this.scan.getStopRow().length > 0) {

View File

@ -117,6 +117,7 @@ public class HTable implements HTableInterface {
private boolean autoFlush = true; private boolean autoFlush = true;
private boolean closed = false; private boolean closed = false;
protected int scannerCaching; protected int scannerCaching;
protected long scannerMaxResultSize;
private ExecutorService pool; // For Multi & Scan private ExecutorService pool; // For Multi & Scan
private int operationTimeout; private int operationTimeout;
private final boolean cleanupPoolOnClose; // shutdown the pool in close() private final boolean cleanupPoolOnClose; // shutdown the pool in close()
@ -351,7 +352,7 @@ public class HTable implements HTableInterface {
this.operationTimeout = tableName.isSystemTable() ? this.operationTimeout = tableName.isSystemTable() ?
tableConfiguration.getMetaOperationTimeout() : tableConfiguration.getOperationTimeout(); tableConfiguration.getMetaOperationTimeout() : tableConfiguration.getOperationTimeout();
this.scannerCaching = tableConfiguration.getScannerCaching(); this.scannerCaching = tableConfiguration.getScannerCaching();
this.scannerMaxResultSize = tableConfiguration.getScannerMaxResultSize();
if (this.rpcCallerFactory == null) { if (this.rpcCallerFactory == null) {
this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration); this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration);
} }
@ -780,6 +781,9 @@ public class HTable implements HTableInterface {
if (scan.getCaching() <= 0) { if (scan.getCaching() <= 0) {
scan.setCaching(getScannerCaching()); scan.setCaching(getScannerCaching());
} }
if (scan.getMaxResultSize() <= 0) {
scan.setMaxResultSize(scannerMaxResultSize);
}
if (scan.isReversed()) { if (scan.isReversed()) {
if (scan.isSmall()) { if (scan.isSmall()) {

View File

@ -37,6 +37,7 @@ public class TableConfiguration {
private final int metaOperationTimeout; private final int metaOperationTimeout;
private final int operationTimeout; private final int operationTimeout;
private final int scannerCaching; private final int scannerCaching;
private final long scannerMaxResultSize;
private final int primaryCallTimeoutMicroSecond; private final int primaryCallTimeoutMicroSecond;
private final int replicaCallTimeoutMicroSecondScan; private final int replicaCallTimeoutMicroSecondScan;
private final int retries; private final int retries;
@ -59,6 +60,10 @@ public class TableConfiguration {
this.scannerCaching = conf.getInt( this.scannerCaching = conf.getInt(
HConstants.HBASE_CLIENT_SCANNER_CACHING, HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING); HConstants.HBASE_CLIENT_SCANNER_CACHING, HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
this.scannerMaxResultSize =
conf.getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
this.primaryCallTimeoutMicroSecond = this.primaryCallTimeoutMicroSecond =
conf.getInt("hbase.client.primaryCallTimeout.get", 10000); // 10ms conf.getInt("hbase.client.primaryCallTimeout.get", 10000); // 10ms
@ -82,6 +87,7 @@ public class TableConfiguration {
this.metaOperationTimeout = HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT; this.metaOperationTimeout = HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT;
this.operationTimeout = HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT; this.operationTimeout = HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT;
this.scannerCaching = HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING; this.scannerCaching = HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING;
this.scannerMaxResultSize = HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE;
this.primaryCallTimeoutMicroSecond = 10000; this.primaryCallTimeoutMicroSecond = 10000;
this.replicaCallTimeoutMicroSecondScan = 1000000; this.replicaCallTimeoutMicroSecondScan = 1000000;
this.retries = HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER; this.retries = HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER;
@ -119,4 +125,8 @@ public class TableConfiguration {
public int getMaxKeyValueSize() { public int getMaxKeyValueSize() {
return maxKeyValueSize; return maxKeyValueSize;
} }
public long getScannerMaxResultSize() {
return scannerMaxResultSize;
}
} }

View File

@ -176,6 +176,56 @@ public class TestScannersFromClientSide {
} }
@Test
public void testMaxResultSizeIsSetToDefault() throws Exception {
TableName TABLE = TableName.valueOf("testMaxResultSizeIsSetToDefault");
Table ht = TEST_UTIL.createTable(TABLE, FAMILY);
// The max result size we expect the scan to use by default.
long expectedMaxResultSize =
TEST_UTIL.getConfiguration().getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
int numRows = 5;
byte[][] ROWS = HTestConst.makeNAscii(ROW, numRows);
int numQualifiers = 10;
byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, numQualifiers);
// Specify the cell size such that a single row will be larger than the default
// value of maxResultSize. This means that Scan RPCs should return at most a single
// result back to the client.
int cellSize = (int) (expectedMaxResultSize / (numQualifiers - 1));
byte[] cellValue = Bytes.createMaxByteArray(cellSize);
Put put;
List<Put> puts = new ArrayList<Put>();
for (int row = 0; row < ROWS.length; row++) {
put = new Put(ROWS[row]);
for (int qual = 0; qual < QUALIFIERS.length; qual++) {
KeyValue kv = new KeyValue(ROWS[row], FAMILY, QUALIFIERS[qual], cellValue);
put.add(kv);
}
puts.add(put);
}
ht.put(puts);
// Create a scan with the default configuration.
Scan scan = new Scan();
ResultScanner scanner = ht.getScanner(scan);
assertTrue(scanner instanceof ClientScanner);
ClientScanner clientScanner = (ClientScanner) scanner;
// Call next to issue a single RPC to the server
scanner.next();
// The scanner should have, at most, a single result in its cache. If there more results exists
// in the cache it means that more than the expected max result size was fetched.
assertTrue("The cache contains: " + clientScanner.getCacheSize() + " results",
clientScanner.getCacheSize() <= 1);
}
@Test @Test
public void testSmallScan() throws Exception { public void testSmallScan() throws Exception {
TableName TABLE = TableName.valueOf("testSmallScan"); TableName TABLE = TableName.valueOf("testSmallScan");