From 7b17ed9e91f324ad2aac258bc5df0bf375e841d2 Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Wed, 16 Jan 2013 21:36:57 +0000 Subject: [PATCH] HBASE-6066 some low hanging read path improvement ideas git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1434415 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop/hbase/regionserver/HRegion.java | 52 +++++++++++-------- .../hbase/regionserver/HRegionServer.java | 12 +++-- 2 files changed, 37 insertions(+), 27 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index e2703cdd2d3..f5005e0f1e5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -3434,7 +3434,6 @@ public class HRegion implements HeapSize { // , Writable{ private final KeyValue KV_LIMIT = new KeyValue(); private final byte [] stopRow; private Filter filter; - private List results = new ArrayList(); private int batch; private int isScan; private boolean filterClosed = false; @@ -3561,11 +3560,16 @@ public class HRegion implements HeapSize { // , Writable{ @Override public boolean nextRaw(List outResults, int limit, String metric) throws IOException { - results.clear(); - - boolean returnResult = nextInternal(limit, metric); - - outResults.addAll(results); + boolean returnResult; + if (outResults.isEmpty()) { + // Usually outResults is empty. This is true when next is called + // to handle scan or get operation. + returnResult = nextInternal(outResults, limit, metric); + } else { + List tmpList = new ArrayList(); + returnResult = nextInternal(tmpList, limit, metric); + outResults.addAll(tmpList); + } resetFilters(); if (isFilterDone()) { return false; @@ -3587,10 +3591,12 @@ public class HRegion implements HeapSize { // , Writable{ return next(outResults, batch, metric); } - private void populateFromJoinedHeap(int limit, String metric) throws IOException { + private void populateFromJoinedHeap(List results, int limit, String metric) + throws IOException { assert joinedContinuationRow != null; - KeyValue kv = populateResult(this.joinedHeap, limit, joinedContinuationRow.getBuffer(), - joinedContinuationRow.getRowOffset(), joinedContinuationRow.getRowLength(), metric); + KeyValue kv = populateResult(results, this.joinedHeap, limit, + joinedContinuationRow.getBuffer(), joinedContinuationRow.getRowOffset(), + joinedContinuationRow.getRowLength(), metric); if (kv != KV_LIMIT) { // We are done with this row, reset the continuation. joinedContinuationRow = null; @@ -3602,6 +3608,7 @@ public class HRegion implements HeapSize { // , Writable{ /** * Fetches records with currentRow into results list, until next row or limit (if not -1). + * @param results * @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call. * @param limit Max amount of KVs to place in result list, -1 means no limit. * @param currentRow Byte array with key we are fetching. @@ -3610,8 +3617,8 @@ public class HRegion implements HeapSize { // , Writable{ * @param metric Metric key to be passed into KeyValueHeap::next(). * @return KV_LIMIT if limit reached, next KeyValue otherwise. */ - private KeyValue populateResult(KeyValueHeap heap, int limit, byte[] currentRow, int offset, - short length, String metric) throws IOException { + private KeyValue populateResult(List results, KeyValueHeap heap, int limit, + byte[] currentRow, int offset, short length, String metric) throws IOException { KeyValue nextKv; do { heap.next(results, limit - results.size(), metric); @@ -3631,7 +3638,11 @@ public class HRegion implements HeapSize { // , Writable{ return this.filter != null && this.filter.filterAllRemaining(); } - private boolean nextInternal(int limit, String metric) throws IOException { + private boolean nextInternal(List results, int limit, String metric) + throws IOException { + if (!results.isEmpty()) { + throw new IllegalArgumentException("First parameter should be an empty list"); + } RpcCallContext rpcCall = HBaseServer.getCurrentCall(); // The loop here is used only when at some point during the next we determine // that due to effects of filters or otherwise, we have an empty row in the result. @@ -3674,11 +3685,12 @@ public class HRegion implements HeapSize { // , Writable{ // Technically, if we hit limits before on this row, we don't need this call. if (filterRowKey(currentRow, offset, length)) { nextRow(currentRow, offset, length); + results.clear(); continue; } - KeyValue nextKv = populateResult(this.storeHeap, limit, currentRow, offset, length, - metric); + KeyValue nextKv = populateResult(results, this.storeHeap, limit, currentRow, offset, + length, metric); // Ok, we are good, let's try to get some results from the main heap. if (nextKv == KV_LIMIT) { if (this.filter != null && filter.hasFilterRow()) { @@ -3699,13 +3711,8 @@ public class HRegion implements HeapSize { // , Writable{ filter.filterRow(results); } if (isEmptyRow) { - // this seems like a redundant step - we already consumed the row - // there're no left overs. - // the reasons for calling this method are: - // 1. reset the filters. - // 2. provide a hook to fast forward the row (used by subclasses) nextRow(currentRow, offset, length); - + results.clear(); // This row was totally filtered out, if this is NOT the last row, // we should continue on. Otherwise, nothing else to do. if (!stopRow) continue; @@ -3725,12 +3732,12 @@ public class HRegion implements HeapSize { // , Writable{ || this.joinedHeap.seek(KeyValue.createFirstOnRow(currentRow, offset, length)); if (mayHaveData) { joinedContinuationRow = current; - populateFromJoinedHeap(limit, metric); + populateFromJoinedHeap(results, limit, metric); } } } else { // Populating from the joined heap was stopped by limits, populate some more. - populateFromJoinedHeap(limit, metric); + populateFromJoinedHeap(results, limit, metric); } // We may have just called populateFromJoinedMap and hit the limits. If that is @@ -3763,7 +3770,6 @@ public class HRegion implements HeapSize { // , Writable{ while ((next = this.storeHeap.peek()) != null && next.matchingRow(currentRow, offset, length)) { this.storeHeap.next(MOCKED_LIST); } - results.clear(); resetFilters(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 0a7409944ef..2c6ce076920 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -3019,8 +3019,10 @@ public class HRegionServer implements ClientProtocol, scanner, results, rows); if (!results.isEmpty()) { for (Result r : results) { - for (KeyValue kv : r.raw()) { - currentScanResultSize += kv.heapSize(); + if (maxScannerResultSize < Long.MAX_VALUE){ + for (KeyValue kv : r.raw()) { + currentScanResultSize += kv.heapSize(); + } } } } @@ -3045,8 +3047,10 @@ public class HRegionServer implements ClientProtocol, // Collect values to be returned here boolean moreRows = scanner.nextRaw(values); if (!values.isEmpty()) { - for (KeyValue kv : values) { - currentScanResultSize += kv.heapSize(); + if (maxScannerResultSize < Long.MAX_VALUE){ + for (KeyValue kv : values) { + currentScanResultSize += kv.heapSize(); + } } results.add(new Result(values)); }