HBASE-6066 some low hanging read path improvement ideas
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1434415 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
bda467aeca
commit
7b17ed9e91
|
@ -3434,7 +3434,6 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
private final KeyValue KV_LIMIT = new KeyValue();
|
private final KeyValue KV_LIMIT = new KeyValue();
|
||||||
private final byte [] stopRow;
|
private final byte [] stopRow;
|
||||||
private Filter filter;
|
private Filter filter;
|
||||||
private List<KeyValue> results = new ArrayList<KeyValue>();
|
|
||||||
private int batch;
|
private int batch;
|
||||||
private int isScan;
|
private int isScan;
|
||||||
private boolean filterClosed = false;
|
private boolean filterClosed = false;
|
||||||
|
@ -3561,11 +3560,16 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
@Override
|
@Override
|
||||||
public boolean nextRaw(List<KeyValue> outResults, int limit,
|
public boolean nextRaw(List<KeyValue> outResults, int limit,
|
||||||
String metric) throws IOException {
|
String metric) throws IOException {
|
||||||
results.clear();
|
boolean returnResult;
|
||||||
|
if (outResults.isEmpty()) {
|
||||||
boolean returnResult = nextInternal(limit, metric);
|
// Usually outResults is empty. This is true when next is called
|
||||||
|
// to handle scan or get operation.
|
||||||
outResults.addAll(results);
|
returnResult = nextInternal(outResults, limit, metric);
|
||||||
|
} else {
|
||||||
|
List<KeyValue> tmpList = new ArrayList<KeyValue>();
|
||||||
|
returnResult = nextInternal(tmpList, limit, metric);
|
||||||
|
outResults.addAll(tmpList);
|
||||||
|
}
|
||||||
resetFilters();
|
resetFilters();
|
||||||
if (isFilterDone()) {
|
if (isFilterDone()) {
|
||||||
return false;
|
return false;
|
||||||
|
@ -3587,10 +3591,12 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
return next(outResults, batch, metric);
|
return next(outResults, batch, metric);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void populateFromJoinedHeap(int limit, String metric) throws IOException {
|
private void populateFromJoinedHeap(List<KeyValue> results, int limit, String metric)
|
||||||
|
throws IOException {
|
||||||
assert joinedContinuationRow != null;
|
assert joinedContinuationRow != null;
|
||||||
KeyValue kv = populateResult(this.joinedHeap, limit, joinedContinuationRow.getBuffer(),
|
KeyValue kv = populateResult(results, this.joinedHeap, limit,
|
||||||
joinedContinuationRow.getRowOffset(), joinedContinuationRow.getRowLength(), metric);
|
joinedContinuationRow.getBuffer(), joinedContinuationRow.getRowOffset(),
|
||||||
|
joinedContinuationRow.getRowLength(), metric);
|
||||||
if (kv != KV_LIMIT) {
|
if (kv != KV_LIMIT) {
|
||||||
// We are done with this row, reset the continuation.
|
// We are done with this row, reset the continuation.
|
||||||
joinedContinuationRow = null;
|
joinedContinuationRow = null;
|
||||||
|
@ -3602,6 +3608,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Fetches records with currentRow into results list, until next row or limit (if not -1).
|
* Fetches records with currentRow into results list, until next row or limit (if not -1).
|
||||||
|
* @param results
|
||||||
* @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call.
|
* @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call.
|
||||||
* @param limit Max amount of KVs to place in result list, -1 means no limit.
|
* @param limit Max amount of KVs to place in result list, -1 means no limit.
|
||||||
* @param currentRow Byte array with key we are fetching.
|
* @param currentRow Byte array with key we are fetching.
|
||||||
|
@ -3610,8 +3617,8 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
* @param metric Metric key to be passed into KeyValueHeap::next().
|
* @param metric Metric key to be passed into KeyValueHeap::next().
|
||||||
* @return KV_LIMIT if limit reached, next KeyValue otherwise.
|
* @return KV_LIMIT if limit reached, next KeyValue otherwise.
|
||||||
*/
|
*/
|
||||||
private KeyValue populateResult(KeyValueHeap heap, int limit, byte[] currentRow, int offset,
|
private KeyValue populateResult(List<KeyValue> results, KeyValueHeap heap, int limit,
|
||||||
short length, String metric) throws IOException {
|
byte[] currentRow, int offset, short length, String metric) throws IOException {
|
||||||
KeyValue nextKv;
|
KeyValue nextKv;
|
||||||
do {
|
do {
|
||||||
heap.next(results, limit - results.size(), metric);
|
heap.next(results, limit - results.size(), metric);
|
||||||
|
@ -3631,7 +3638,11 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
return this.filter != null && this.filter.filterAllRemaining();
|
return this.filter != null && this.filter.filterAllRemaining();
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean nextInternal(int limit, String metric) throws IOException {
|
private boolean nextInternal(List<KeyValue> results, int limit, String metric)
|
||||||
|
throws IOException {
|
||||||
|
if (!results.isEmpty()) {
|
||||||
|
throw new IllegalArgumentException("First parameter should be an empty list");
|
||||||
|
}
|
||||||
RpcCallContext rpcCall = HBaseServer.getCurrentCall();
|
RpcCallContext rpcCall = HBaseServer.getCurrentCall();
|
||||||
// The loop here is used only when at some point during the next we determine
|
// The loop here is used only when at some point during the next we determine
|
||||||
// that due to effects of filters or otherwise, we have an empty row in the result.
|
// that due to effects of filters or otherwise, we have an empty row in the result.
|
||||||
|
@ -3674,11 +3685,12 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
// Technically, if we hit limits before on this row, we don't need this call.
|
// Technically, if we hit limits before on this row, we don't need this call.
|
||||||
if (filterRowKey(currentRow, offset, length)) {
|
if (filterRowKey(currentRow, offset, length)) {
|
||||||
nextRow(currentRow, offset, length);
|
nextRow(currentRow, offset, length);
|
||||||
|
results.clear();
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
KeyValue nextKv = populateResult(this.storeHeap, limit, currentRow, offset, length,
|
KeyValue nextKv = populateResult(results, this.storeHeap, limit, currentRow, offset,
|
||||||
metric);
|
length, metric);
|
||||||
// Ok, we are good, let's try to get some results from the main heap.
|
// Ok, we are good, let's try to get some results from the main heap.
|
||||||
if (nextKv == KV_LIMIT) {
|
if (nextKv == KV_LIMIT) {
|
||||||
if (this.filter != null && filter.hasFilterRow()) {
|
if (this.filter != null && filter.hasFilterRow()) {
|
||||||
|
@ -3699,13 +3711,8 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
filter.filterRow(results);
|
filter.filterRow(results);
|
||||||
}
|
}
|
||||||
if (isEmptyRow) {
|
if (isEmptyRow) {
|
||||||
// this seems like a redundant step - we already consumed the row
|
|
||||||
// there're no left overs.
|
|
||||||
// the reasons for calling this method are:
|
|
||||||
// 1. reset the filters.
|
|
||||||
// 2. provide a hook to fast forward the row (used by subclasses)
|
|
||||||
nextRow(currentRow, offset, length);
|
nextRow(currentRow, offset, length);
|
||||||
|
results.clear();
|
||||||
// This row was totally filtered out, if this is NOT the last row,
|
// This row was totally filtered out, if this is NOT the last row,
|
||||||
// we should continue on. Otherwise, nothing else to do.
|
// we should continue on. Otherwise, nothing else to do.
|
||||||
if (!stopRow) continue;
|
if (!stopRow) continue;
|
||||||
|
@ -3725,12 +3732,12 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
|| this.joinedHeap.seek(KeyValue.createFirstOnRow(currentRow, offset, length));
|
|| this.joinedHeap.seek(KeyValue.createFirstOnRow(currentRow, offset, length));
|
||||||
if (mayHaveData) {
|
if (mayHaveData) {
|
||||||
joinedContinuationRow = current;
|
joinedContinuationRow = current;
|
||||||
populateFromJoinedHeap(limit, metric);
|
populateFromJoinedHeap(results, limit, metric);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Populating from the joined heap was stopped by limits, populate some more.
|
// Populating from the joined heap was stopped by limits, populate some more.
|
||||||
populateFromJoinedHeap(limit, metric);
|
populateFromJoinedHeap(results, limit, metric);
|
||||||
}
|
}
|
||||||
|
|
||||||
// We may have just called populateFromJoinedMap and hit the limits. If that is
|
// We may have just called populateFromJoinedMap and hit the limits. If that is
|
||||||
|
@ -3763,7 +3770,6 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
while ((next = this.storeHeap.peek()) != null && next.matchingRow(currentRow, offset, length)) {
|
while ((next = this.storeHeap.peek()) != null && next.matchingRow(currentRow, offset, length)) {
|
||||||
this.storeHeap.next(MOCKED_LIST);
|
this.storeHeap.next(MOCKED_LIST);
|
||||||
}
|
}
|
||||||
results.clear();
|
|
||||||
resetFilters();
|
resetFilters();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3019,8 +3019,10 @@ public class HRegionServer implements ClientProtocol,
|
||||||
scanner, results, rows);
|
scanner, results, rows);
|
||||||
if (!results.isEmpty()) {
|
if (!results.isEmpty()) {
|
||||||
for (Result r : results) {
|
for (Result r : results) {
|
||||||
for (KeyValue kv : r.raw()) {
|
if (maxScannerResultSize < Long.MAX_VALUE){
|
||||||
currentScanResultSize += kv.heapSize();
|
for (KeyValue kv : r.raw()) {
|
||||||
|
currentScanResultSize += kv.heapSize();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3045,8 +3047,10 @@ public class HRegionServer implements ClientProtocol,
|
||||||
// Collect values to be returned here
|
// Collect values to be returned here
|
||||||
boolean moreRows = scanner.nextRaw(values);
|
boolean moreRows = scanner.nextRaw(values);
|
||||||
if (!values.isEmpty()) {
|
if (!values.isEmpty()) {
|
||||||
for (KeyValue kv : values) {
|
if (maxScannerResultSize < Long.MAX_VALUE){
|
||||||
currentScanResultSize += kv.heapSize();
|
for (KeyValue kv : values) {
|
||||||
|
currentScanResultSize += kv.heapSize();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
results.add(new Result(values));
|
results.add(new Result(values));
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue