HBASE-5664 CP hooks in Scan flow for fast forward when filter filters out a row (Anoop)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1437790 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2013-01-23 23:01:50 +00:00
parent 507565e35f
commit c637f33c8c
4 changed files with 77 additions and 10 deletions

View File

@ -314,6 +314,12 @@ public abstract class BaseRegionObserver implements RegionObserver {
return hasMore; return hasMore;
} }
@Override
public boolean postScannerFilterRow(final ObserverContext<RegionCoprocessorEnvironment> e,
final InternalScanner s, final byte[] currentRow, final boolean hasMore) throws IOException {
return hasMore;
}
@Override @Override
public void preScannerClose(final ObserverContext<RegionCoprocessorEnvironment> e, public void preScannerClose(final ObserverContext<RegionCoprocessorEnvironment> e,
final InternalScanner s) throws IOException { final InternalScanner s) throws IOException {

View File

@ -734,6 +734,27 @@ public interface RegionObserver extends Coprocessor {
final boolean hasNext) final boolean hasNext)
throws IOException; throws IOException;
/**
* This will be called by the scan flow when the current scanned row is being filtered out by the
* filter. The filter may be filtering out the row via any of the below scenarios
* <ol>
* <li>
* <code>boolean filterRowKey(byte [] buffer, int offset, int length)</code> returning true</li>
* <li>
* <code>boolean filterRow()</code> returning true</li>
* <li>
* <code>void filterRow(List<KeyValue> kvs)</code> removing all the kvs from the passed List</li>
* </ol>
* @param c the environment provided by the region server
* @param s the scanner
* @param currentRow The current rowkey which got filtered out
* @param hasMore the 'has more' indication
* @return whether more rows are available for the scanner or not
* @throws IOException
*/
boolean postScannerFilterRow(final ObserverContext<RegionCoprocessorEnvironment> c,
final InternalScanner s, final byte[] currentRow, final boolean hasMore) throws IOException;
/** /**
* Called before the client closes a scanner. * Called before the client closes a scanner.
* <p> * <p>

View File

@ -1774,7 +1774,7 @@ public class HRegion implements HeapSize { // , Writable{
protected RegionScanner instantiateRegionScanner(Scan scan, protected RegionScanner instantiateRegionScanner(Scan scan,
List<KeyValueScanner> additionalScanners) throws IOException { List<KeyValueScanner> additionalScanners) throws IOException {
return new RegionScannerImpl(scan, additionalScanners); return new RegionScannerImpl(scan, additionalScanners, this);
} }
/* /*
@ -3380,13 +3380,16 @@ public class HRegion implements HeapSize { // , Writable{
private boolean filterClosed = false; private boolean filterClosed = false;
private long readPt; private long readPt;
private long maxResultSize; private long maxResultSize;
private HRegion region;
public HRegionInfo getRegionInfo() { public HRegionInfo getRegionInfo() {
return regionInfo; return regionInfo;
} }
RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners) throws IOException {
//DebugPrint.println("HRegionScanner.<init>"); RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region)
throws IOException {
// DebugPrint.println("HRegionScanner.<init>");
this.region = region;
this.maxResultSize = scan.getMaxResultSize(); this.maxResultSize = scan.getMaxResultSize();
if (scan.hasFilter()) { if (scan.hasFilter()) {
this.filter = new FilterWrapper(scan.getFilter()); this.filter = new FilterWrapper(scan.getFilter());
@ -3443,8 +3446,8 @@ public class HRegion implements HeapSize { // , Writable{
} }
} }
RegionScannerImpl(Scan scan) throws IOException { RegionScannerImpl(Scan scan, HRegion region) throws IOException {
this(scan, null); this(scan, null, region);
} }
@Override @Override
@ -3625,7 +3628,8 @@ public class HRegion implements HeapSize { // , Writable{
// Check if rowkey filter wants to exclude this row. If so, loop to next. // Check if rowkey filter wants to exclude this row. If so, loop to next.
// Technically, if we hit limits before on this row, we don't need this call. // Technically, if we hit limits before on this row, we don't need this call.
if (filterRowKey(currentRow, offset, length)) { if (filterRowKey(currentRow, offset, length)) {
nextRow(currentRow, offset, length); boolean moreRows = nextRow(currentRow, offset, length);
if (!moreRows) return false;
results.clear(); results.clear();
continue; continue;
} }
@ -3652,7 +3656,8 @@ public class HRegion implements HeapSize { // , Writable{
filter.filterRow(results); filter.filterRow(results);
} }
if (isEmptyRow) { if (isEmptyRow) {
nextRow(currentRow, offset, length); boolean moreRows = nextRow(currentRow, offset, length);
if (!moreRows) return false;
results.clear(); results.clear();
// This row was totally filtered out, if this is NOT the last row, // This row was totally filtered out, if this is NOT the last row,
// we should continue on. Otherwise, nothing else to do. // we should continue on. Otherwise, nothing else to do.
@ -3691,7 +3696,8 @@ public class HRegion implements HeapSize { // , Writable{
// Double check to prevent empty rows from appearing in result. It could be // Double check to prevent empty rows from appearing in result. It could be
// the case when SingleColumnValueExcludeFilter is used. // the case when SingleColumnValueExcludeFilter is used.
if (results.isEmpty()) { if (results.isEmpty()) {
nextRow(currentRow, offset, length); boolean moreRows = nextRow(currentRow, offset, length);
if (!moreRows) return false;
if (!stopRow) continue; if (!stopRow) continue;
} }
@ -3705,13 +3711,18 @@ public class HRegion implements HeapSize { // , Writable{
&& filter.filterRowKey(row, offset, length); && filter.filterRowKey(row, offset, length);
} }
protected void nextRow(byte [] currentRow, int offset, short length) throws IOException { protected boolean nextRow(byte [] currentRow, int offset, short length) throws IOException {
assert this.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read."; assert this.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read.";
KeyValue next; KeyValue next;
while ((next = this.storeHeap.peek()) != null && next.matchingRow(currentRow, offset, length)) { while ((next = this.storeHeap.peek()) != null && next.matchingRow(currentRow, offset, length)) {
this.storeHeap.next(MOCKED_LIST); this.storeHeap.next(MOCKED_LIST);
} }
resetFilters(); resetFilters();
// Calling the hook in CP which allows it to do a fast forward
if (this.region.getCoprocessorHost() != null) {
return this.region.getCoprocessorHost().postScannerFilterRow(this, currentRow);
}
return true;
} }
private boolean isStopRow(byte [] currentRow, int offset, short length) { private boolean isStopRow(byte [] currentRow, int offset, short length) {

View File

@ -1349,6 +1349,35 @@ public class RegionCoprocessorHost
return hasMore; return hasMore;
} }
/**
* This will be called by the scan flow when the current scanned row is being filtered out by the
* filter.
* @param s the scanner
* @param currentRow The current rowkey which got filtered out
* @return whether more rows are available for the scanner or not
* @throws IOException
*/
public boolean postScannerFilterRow(final InternalScanner s, final byte[] currentRow)
throws IOException {
boolean hasMore = true; // By default assume more rows there.
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
for (RegionEnvironment env : coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
ctx = ObserverContext.createAndPrepare(env, ctx);
try {
hasMore = ((RegionObserver) env.getInstance()).postScannerFilterRow(ctx, s, currentRow,
hasMore);
} catch (Throwable e) {
handleCoprocessorThrowable(env, e);
}
if (ctx.shouldComplete()) {
break;
}
}
}
return hasMore;
}
/** /**
* @param s the scanner * @param s the scanner
* @return true if default behavior should be bypassed, false otherwise * @return true if default behavior should be bypassed, false otherwise