HBASE-7180 RegionScannerImpl.next() is inefficient.
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1420004 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
fb3663bfd4
commit
b017ee8e9a
|
@ -3467,6 +3467,10 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
return maxResultSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMvccReadPoint() {
|
||||
return this.readPt;
|
||||
}
|
||||
/**
|
||||
* Reset both the filter and the old filter.
|
||||
*/
|
||||
|
@ -3477,7 +3481,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
}
|
||||
|
||||
@Override
|
||||
public synchronized boolean next(List<KeyValue> outResults, int limit)
|
||||
public boolean next(List<KeyValue> outResults, int limit)
|
||||
throws IOException {
|
||||
return next(outResults, limit, null);
|
||||
}
|
||||
|
@ -3497,30 +3501,42 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
// This could be a new thread from the last time we called next().
|
||||
MultiVersionConsistencyControl.setThreadReadPoint(this.readPt);
|
||||
|
||||
results.clear();
|
||||
|
||||
boolean returnResult = nextInternal(limit, metric);
|
||||
|
||||
outResults.addAll(results);
|
||||
resetFilters();
|
||||
if (isFilterDone()) {
|
||||
return false;
|
||||
}
|
||||
return returnResult;
|
||||
return nextRaw(outResults, limit, metric);
|
||||
} finally {
|
||||
closeRegionOperation();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized boolean next(List<KeyValue> outResults)
|
||||
public boolean nextRaw(List<KeyValue> outResults)
|
||||
throws IOException {
|
||||
return nextRaw(outResults, batch, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean nextRaw(List<KeyValue> outResults, int limit,
|
||||
String metric) throws IOException {
|
||||
results.clear();
|
||||
|
||||
boolean returnResult = nextInternal(limit, metric);
|
||||
|
||||
outResults.addAll(results);
|
||||
resetFilters();
|
||||
if (isFilterDone()) {
|
||||
return false;
|
||||
}
|
||||
return returnResult;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean next(List<KeyValue> outResults)
|
||||
throws IOException {
|
||||
// apply the batching limit by default
|
||||
return next(outResults, batch, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized boolean next(List<KeyValue> outResults, String metric)
|
||||
public boolean next(List<KeyValue> outResults, String metric)
|
||||
throws IOException {
|
||||
// apply the batching limit by default
|
||||
return next(outResults, batch, metric);
|
||||
|
@ -5274,7 +5290,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
* @throws RegionTooBusyException if failed to get the lock in time
|
||||
* @throws InterruptedIOException if interrupted while waiting for a lock
|
||||
*/
|
||||
private void startRegionOperation()
|
||||
public void startRegionOperation()
|
||||
throws NotServingRegionException, RegionTooBusyException, InterruptedIOException {
|
||||
if (this.closing.get()) {
|
||||
throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
|
||||
|
@ -5292,7 +5308,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
* Closes the lock. This needs to be called in the finally block corresponding
|
||||
* to the try block of #startRegionOperation
|
||||
*/
|
||||
private void closeRegionOperation() {
|
||||
public void closeRegionOperation() {
|
||||
lock.readLock().unlock();
|
||||
}
|
||||
|
||||
|
|
|
@ -2912,20 +2912,30 @@ public class HRegionServer implements ClientProtocol,
|
|||
maxResultSize = maxScannerResultSize;
|
||||
}
|
||||
List<KeyValue> values = new ArrayList<KeyValue>();
|
||||
for (int i = 0; i < rows
|
||||
&& currentScanResultSize < maxResultSize; i++) {
|
||||
// Collect values to be returned here
|
||||
boolean moreRows = scanner.next(values);
|
||||
if (!values.isEmpty()) {
|
||||
for (KeyValue kv : values) {
|
||||
currentScanResultSize += kv.heapSize();
|
||||
MultiVersionConsistencyControl.setThreadReadPoint(scanner.getMvccReadPoint());
|
||||
region.startRegionOperation();
|
||||
try {
|
||||
int i = 0;
|
||||
synchronized(scanner) {
|
||||
for (; i < rows
|
||||
&& currentScanResultSize < maxResultSize; i++) {
|
||||
// Collect values to be returned here
|
||||
boolean moreRows = scanner.nextRaw(values);
|
||||
if (!values.isEmpty()) {
|
||||
for (KeyValue kv : values) {
|
||||
currentScanResultSize += kv.heapSize();
|
||||
}
|
||||
results.add(new Result(values));
|
||||
}
|
||||
if (!moreRows) {
|
||||
break;
|
||||
}
|
||||
values.clear();
|
||||
}
|
||||
results.add(new Result(values));
|
||||
}
|
||||
if (!moreRows) {
|
||||
break;
|
||||
}
|
||||
values.clear();
|
||||
region.readRequestsCount.add(i);
|
||||
} finally {
|
||||
region.closeRegionOperation();
|
||||
}
|
||||
|
||||
// coprocessor postNext hook
|
||||
|
|
|
@ -19,9 +19,11 @@
|
|||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
|
||||
/**
|
||||
|
@ -56,4 +58,50 @@ public interface RegionScanner extends InternalScanner {
|
|||
* @return The preferred max buffersize. See {@link Scan#setMaxResultSize(long)}
|
||||
*/
|
||||
public long getMaxResultSize();
|
||||
|
||||
/**
|
||||
* @return The Scanner's MVCC readPt see {@link MultiVersionConsistencyControl}
|
||||
*/
|
||||
public long getMvccReadPoint();
|
||||
|
||||
/**
|
||||
* Grab the next row's worth of values with the default limit on the number of values
|
||||
* to return.
|
||||
* This is a special internal method to be called from coprocessor hooks to avoid expensive setup.
|
||||
* Caller must set the thread's readpoint, start and close a region operation, an synchronize on the scanner object.
|
||||
* See {@link #nextRaw(List, int, String)}
|
||||
* @param result return output array
|
||||
* @return true if more rows exist after this one, false if scanner is done
|
||||
* @throws IOException e
|
||||
*/
|
||||
public boolean nextRaw(List<KeyValue> result) throws IOException;
|
||||
|
||||
/**
|
||||
* Grab the next row's worth of values with a limit on the number of values
|
||||
* to return.
|
||||
* This is a special internal method to be called from coprocessor hooks to avoid expensive setup.
|
||||
* Caller must set the thread's readpoint, start and close a region operation, an synchronize on the scanner object.
|
||||
* Example:
|
||||
* <code><pre>
|
||||
* HRegion region = ...;
|
||||
* RegionScanner scanner = ...
|
||||
* MultiVersionConsistencyControl.setThreadReadPoint(scanner.getMvccReadPoint());
|
||||
* region.startRegionOperation();
|
||||
* try {
|
||||
* synchronized(scanner) {
|
||||
* ...
|
||||
* boolean moreRows = scanner.nextRaw(values);
|
||||
* ...
|
||||
* }
|
||||
* } finally {
|
||||
* region.closeRegionOperation();
|
||||
* }
|
||||
* </pre></code>
|
||||
* @param result return output array
|
||||
* @param limit limit on row count to get
|
||||
* @param metric the metric name
|
||||
* @return true if more rows exist after this one, false if scanner is done
|
||||
* @throws IOException e
|
||||
*/
|
||||
public boolean nextRaw(List<KeyValue> result, int limit, String metric) throws IOException;
|
||||
}
|
||||
|
|
|
@ -98,6 +98,18 @@ public class TestCoprocessorInterface extends HBaseTestCase {
|
|||
return delegate.next(result, limit, metric);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean nextRaw(List<KeyValue> result, int limit, String metric)
|
||||
throws IOException {
|
||||
return delegate.nextRaw(result, limit, metric);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean nextRaw(List<KeyValue> result)
|
||||
throws IOException {
|
||||
return delegate.nextRaw(result);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
delegate.close();
|
||||
|
@ -123,6 +135,10 @@ public class TestCoprocessorInterface extends HBaseTestCase {
|
|||
return delegate.getMaxResultSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMvccReadPoint() {
|
||||
return delegate.getMvccReadPoint();
|
||||
}
|
||||
}
|
||||
|
||||
public static class CoprocessorImpl extends BaseRegionObserver {
|
||||
|
|
Loading…
Reference in New Issue