diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 0ee29f2dc26..8dce8d8fec2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -209,6 +209,11 @@ public class ClientScanner extends AbstractClientScanner { return lastNext; } + @VisibleForTesting + protected long getMaxResultSize() { + return maxScannerResultSize; + } + // returns true if the passed region endKey protected boolean checkScanStopRow(final byte [] endKey) { if (this.scan.getStopRow().length > 0) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index e8518bd0907..5b0e685c473 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -117,6 +117,7 @@ public class HTable implements HTableInterface { private boolean autoFlush = true; private boolean closed = false; protected int scannerCaching; + protected long scannerMaxResultSize; private ExecutorService pool; // For Multi & Scan private int operationTimeout; private final boolean cleanupPoolOnClose; // shutdown the pool in close() @@ -351,7 +352,7 @@ public class HTable implements HTableInterface { this.operationTimeout = tableName.isSystemTable() ? tableConfiguration.getMetaOperationTimeout() : tableConfiguration.getOperationTimeout(); this.scannerCaching = tableConfiguration.getScannerCaching(); - + this.scannerMaxResultSize = tableConfiguration.getScannerMaxResultSize(); if (this.rpcCallerFactory == null) { this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration); } @@ -780,6 +781,9 @@ public class HTable implements HTableInterface { if (scan.getCaching() <= 0) { scan.setCaching(getScannerCaching()); } + if (scan.getMaxResultSize() <= 0) { + scan.setMaxResultSize(scannerMaxResultSize); + } if (scan.isReversed()) { if (scan.isSmall()) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableConfiguration.java index 70ad179bfaf..901e86d3043 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableConfiguration.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableConfiguration.java @@ -37,6 +37,7 @@ public class TableConfiguration { private final int metaOperationTimeout; private final int operationTimeout; private final int scannerCaching; + private final long scannerMaxResultSize; private final int primaryCallTimeoutMicroSecond; private final int replicaCallTimeoutMicroSecondScan; private final int retries; @@ -59,6 +60,10 @@ public class TableConfiguration { this.scannerCaching = conf.getInt( HConstants.HBASE_CLIENT_SCANNER_CACHING, HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING); + this.scannerMaxResultSize = + conf.getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, + HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); + this.primaryCallTimeoutMicroSecond = conf.getInt("hbase.client.primaryCallTimeout.get", 10000); // 10ms @@ -82,6 +87,7 @@ public class TableConfiguration { this.metaOperationTimeout = HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT; this.operationTimeout = HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT; this.scannerCaching = HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING; + this.scannerMaxResultSize = HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE; this.primaryCallTimeoutMicroSecond = 10000; this.replicaCallTimeoutMicroSecondScan = 1000000; this.retries = HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER; @@ -119,4 +125,8 @@ public class TableConfiguration { public int getMaxKeyValueSize() { return maxKeyValueSize; } + + public long getScannerMaxResultSize() { + return scannerMaxResultSize; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java index 1bdd2fd7a3a..10a8f12c424 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java @@ -176,6 +176,56 @@ public class TestScannersFromClientSide { } + @Test + public void testMaxResultSizeIsSetToDefault() throws Exception { + TableName TABLE = TableName.valueOf("testMaxResultSizeIsSetToDefault"); + Table ht = TEST_UTIL.createTable(TABLE, FAMILY); + + // The max result size we expect the scan to use by default. + long expectedMaxResultSize = + TEST_UTIL.getConfiguration().getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, + HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); + + int numRows = 5; + byte[][] ROWS = HTestConst.makeNAscii(ROW, numRows); + + int numQualifiers = 10; + byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, numQualifiers); + + // Specify the cell size such that a single row will be larger than the default + // value of maxResultSize. This means that Scan RPCs should return at most a single + // result back to the client. + int cellSize = (int) (expectedMaxResultSize / (numQualifiers - 1)); + byte[] cellValue = Bytes.createMaxByteArray(cellSize); + + Put put; + List puts = new ArrayList(); + for (int row = 0; row < ROWS.length; row++) { + put = new Put(ROWS[row]); + for (int qual = 0; qual < QUALIFIERS.length; qual++) { + KeyValue kv = new KeyValue(ROWS[row], FAMILY, QUALIFIERS[qual], cellValue); + put.add(kv); + } + puts.add(put); + } + ht.put(puts); + + // Create a scan with the default configuration. + Scan scan = new Scan(); + + ResultScanner scanner = ht.getScanner(scan); + assertTrue(scanner instanceof ClientScanner); + ClientScanner clientScanner = (ClientScanner) scanner; + + // Call next to issue a single RPC to the server + scanner.next(); + + // The scanner should have, at most, a single result in its cache. If there more results exists + // in the cache it means that more than the expected max result size was fetched. + assertTrue("The cache contains: " + clientScanner.getCacheSize() + " results", + clientScanner.getCacheSize() <= 1); + } + @Test public void testSmallScan() throws Exception { TableName TABLE = TableName.valueOf("testSmallScan");