HBASE-1996 Configure scanner buffer in bytes instead of number of rows
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@895820 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b378ea615f
commit
95ed506323
|
@ -282,6 +282,8 @@ Release 0.21.0 - Unreleased
|
|||
(Kay Kay via Stack)
|
||||
HBASE-2052 Upper bound of outstanding WALs can be overrun
|
||||
HBASE-2086 Job(configuration,String) deprecated (Kay Kay via Stack)
|
||||
HBASE-1996 Configure scanner buffer in bytes instead of number of rows
|
||||
(Erik Rozendaal and Dave Latham via Stack)
|
||||
|
||||
NEW FEATURES
|
||||
HBASE-1901 "General" partitioner for "hbase-48" bulk (behind the api, write
|
||||
|
|
|
@ -284,4 +284,20 @@ public interface HConstants {
|
|||
TABLE_SET_HTD,
|
||||
TABLE_SPLIT
|
||||
}
|
||||
|
||||
/**
|
||||
* Parameter name for maximum number of bytes returned when calling a
|
||||
* scanner's next method.
|
||||
*/
|
||||
public static String HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY = "hbase.client.scanner.max.result.size";
|
||||
|
||||
/**
|
||||
* Maximum number of bytes returned when calling a scanner's next method.
|
||||
* Note that when a single row is larger than this limit the row is still
|
||||
* returned completely.
|
||||
*
|
||||
* The default value is unlimited.
|
||||
*/
|
||||
public static long DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE = Long.MAX_VALUE;
|
||||
|
||||
}
|
||||
|
|
|
@ -64,6 +64,7 @@ public class HTable implements HTableInterface {
|
|||
private boolean autoFlush;
|
||||
private long currentWriteBufferSize;
|
||||
protected int scannerCaching;
|
||||
private long maxScannerResultSize;
|
||||
private int maxKeyValueSize;
|
||||
|
||||
/**
|
||||
|
@ -124,6 +125,9 @@ public class HTable implements HTableInterface {
|
|||
this.autoFlush = true;
|
||||
this.currentWriteBufferSize = 0;
|
||||
this.scannerCaching = conf.getInt("hbase.client.scanner.caching", 1);
|
||||
this.maxScannerResultSize = conf.getLong(
|
||||
HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
|
||||
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
|
||||
this.maxKeyValueSize = conf.getInt("hbase.client.keyvalue.maxsize", -1);
|
||||
}
|
||||
|
||||
|
@ -855,6 +859,7 @@ public class HTable implements HTableInterface {
|
|||
}
|
||||
if (cache.size() == 0) {
|
||||
Result [] values = null;
|
||||
long remainingResultSize = maxScannerResultSize;
|
||||
int countdown = this.caching;
|
||||
// We need to reset it if it's a new callable that was created
|
||||
// with a countdown in nextScanner
|
||||
|
@ -902,12 +907,15 @@ public class HTable implements HTableInterface {
|
|||
if (values != null && values.length > 0) {
|
||||
for (Result rs : values) {
|
||||
cache.add(rs);
|
||||
for (KeyValue kv : rs.raw()) {
|
||||
remainingResultSize -= kv.heapSize();
|
||||
}
|
||||
countdown--;
|
||||
this.lastResult = rs;
|
||||
}
|
||||
}
|
||||
// Values == null means server-side filter has determined we must STOP
|
||||
} while (countdown > 0 && nextScanner(countdown, values == null));
|
||||
} while (remainingResultSize > 0 && countdown > 0 && nextScanner(countdown, values == null));
|
||||
}
|
||||
|
||||
if (cache.size() > 0) {
|
||||
|
|
|
@ -158,6 +158,8 @@ public class HRegionServer implements HConstants, HRegionInterface,
|
|||
|
||||
protected final int numRegionsToReport;
|
||||
|
||||
private final long maxScannerResultSize;
|
||||
|
||||
// Remote HMaster
|
||||
private HMasterRegionInterface hbaseMaster;
|
||||
|
||||
|
@ -256,6 +258,10 @@ public class HRegionServer implements HConstants, HRegionInterface,
|
|||
|
||||
sleeper = new Sleeper(this.msgInterval, this.stopRequested);
|
||||
|
||||
this.maxScannerResultSize = conf.getLong(
|
||||
HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
|
||||
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
|
||||
|
||||
// Task thread to process requests from Master
|
||||
this.worker = new Worker();
|
||||
|
||||
|
@ -1795,12 +1801,16 @@ public class HRegionServer implements HConstants, HRegionInterface,
|
|||
}
|
||||
this.leases.renewLease(scannerName);
|
||||
List<Result> results = new ArrayList<Result>();
|
||||
for (int i = 0; i < nbRows; i++) {
|
||||
long currentScanResultSize = 0;
|
||||
for (int i = 0; i < nbRows && currentScanResultSize < maxScannerResultSize; i++) {
|
||||
requestCount.incrementAndGet();
|
||||
// Collect values to be returned here
|
||||
List<KeyValue> values = new ArrayList<KeyValue>();
|
||||
boolean moreRows = s.next(values);
|
||||
if (!values.isEmpty()) {
|
||||
for (KeyValue kv : values) {
|
||||
currentScanResultSize += kv.heapSize();
|
||||
}
|
||||
results.add(new Result(values));
|
||||
}
|
||||
if (!moreRows) {
|
||||
|
|
Loading…
Reference in New Issue